三维重建之colmap+openmvs
本文介绍了基于Docker的3D重建环境配置流程。首先通过Docker搭建Ubuntu 20.04环境并配置GUI功能,然后详细记录了COLMAP和OpenMVS的编译安装过程,包括依赖安装、源码编译和错误修正。其中COLMAP安装需解决apt安装失败问题,OpenMVS编译需修改代码错误并处理依赖关系。最后配置Python环境运行3D重建代码,解决了typing_extensions版本问题。整
一.环境配置
使用docker环境辅助环境配置。
1.拉取Docker镜像
sudo docker pull ubuntu:20.04
拉取的为ununtu20版本镜像,环境十分干净,可以通过以下命令查看容器列表
sudo docker images
如果想删除多余的docker image,可以使用指令
sudo docker rmi -f <id>
2.创建容器
sudo docker run -it -v /home/pc/docker/openmvs:/home/pc/docker/openmvs --name=openmvs --net=host --env="DISPLAY" --volume="$HOME/.Xauthority:/root/.Xauthority:rw" ubuntu:20.04 /bin/bash
基于ubuntu20.04建立容器,并开启GUI功能。
进入容器后,会在root命令下,到此我们便得到了一个纯净的ubuntu环境。
可以使用sudo docker ps查看正在运行的容器,sudo docker ps -a查看所有容器
当我想要退出时,输入exit退出容器,然后使用sudo docker stop openmvs来停止容器,当我想再次进入容器时,使用sudo docker start openmvs启动容器,再使用sudo docker exec -it openmvs bash进入容器
3.环境配置
3.1.colmap
功能: COLMAP 是一个集成的 SfM 和 MVS 工具,可以自动从多视图图像集中计算相机位姿并构建高精度的三维点云。
特点:
- 完整的 SfM 和 MVS 管道,自动化程度高。
- 支持密集点云重建。
- 强大的图形用户界面和命令行支持。
适用场景: 适合科研、文化遗产保护、建筑测绘等需要高精度三维重建的应用场景。(NeRF和3DGS都使用)
为什么记录腻,apt安装失败咯,源码搞起。
先安装依赖:
apt-get install \
git \
cmake \
ninja-build \
build-essential \
libboost-program-options-dev \
libboost-filesystem-dev \
libboost-graph-dev \
libboost-system-dev \
libeigen3-dev \
libflann-dev \
libfreeimage-dev \
libmetis-dev \
libgoogle-glog-dev \
libgtest-dev \
libsqlite3-dev \
libglew-dev \
qtbase5-dev \
libqt5opengl5-dev \
libcgal-dev \
libceres-dev
拉取colmap,编译
git clone https://github.com/colmap/colmap.git
cd colmap
mkdir build
cd build
cmake .. -GNinja
ninja -j8
ninja install
编译安装完成之后,输入指令查看是否安装成功
colmap -h
3.2.openmvs
安装指南(官网):https://github.com/cdcseacave/openMVS#build
安装vcg:
git clone -b devel https://github.com/cnr-isti-vclab/vcglib.git #vcg只有头文件,不用编译
# 但要修改一处代码: HETYPE ht=*this; ==》 MTTYPE ht=*this;
遇到一处编译错误,更改代码即可:HETYPE ht=*this; ==》 MTTYPE ht=*this;
安装eigen3.4:
cd ~
wget https://gitlab.com/libeigen/eigen/-/archive/3.4.0/eigen-3.4.0.tar.gz
tar -xzf eigen-3.4.0.tar.gz
cd eigen-3.4.0
mkdir build && cd build
cmake ..
sudo make install
安装glfw3 (可选):
sudo apt‐get install freeglut3‐dev libglew‐dev libglfw3‐dev
下载、编译openMVS:
git clone https://github.com/cdcseacave/openMVS
mkdir openMVS_build
cd openMVS_build
#Cmake 配置下
cmake .. -DCMAKE_BUILD_TYPE=Release -DVCG_ROOT=../../vcglib/ -DEigen3_INCLUDE_DIR=/usr/local/include/eigen3 -DEIGEN3_INCLUDE_DIR=/usr/local/include/eigen3 -DCMAKE_PREFIX_PATH="/usr/local"
#make编译下:
make -j4
其中可能会缺少一些依赖:
apt-get install python3-dev
apt-get install libboost-iostreams-dev
apt-get install libopencv-dev
在make的时候也许会出现如下错误:
1.error: missing binary operator before token "(" 225 | #if defined(__has_builtin) && __has_builtin(__builtin_debugtrap)
直接定位错误,将原先的#if defined(__has_builtin) && __has_builtin(__builtin_debugtrap)改为
2.fatal error: CGAL/AABB_traits_3.h: No such file or directory 41 | #include <CGAL/AABB_traits_3.h>
经检查,是代码部分有误,应该为CGAL/AABB_traits.h,同样的错误还有下一行的CGAL/AABB_triangle_primitive.h
到此,编译结束。
把编译好的 `openMVS_build/bin` 下的全部二进制文件放入 `openMVS` 文件夹中。当前的文件结构为
其中code目录为重建代码目录,其他为辅助的下载的文件。现在就需要将openMVS/openMVS_build/bin下的全部二进制文件放入 code/openMVS 文件夹中。
二.执行
当前code目录下的文件结构如:
还需要安装一些python以来,如
pip install open3d
pip install opencv-python
pip install trimesh
执行代码python3 colmap_mvs_reconstruction_v2.py遇到报错
解决方法:pip install --upgrade typing_extensions
之后即可顺利跑通代码。
其中代码colmap_mvs_reconstruction_v2.py如下:
import subprocess
import os
import numpy as np
from pathlib import Path
import shutil
import open3d as o3d
import cv2
import time
import trimesh
from PIL import Image
import pickle
from tqdm import tqdm
def video2img(video_path, img_save_path, fps=5, comp='0'):
_, ext = os.path.splitext(video_path)
ext = ext.lower()
video_extensions = {
'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv', '.mpeg', '.mpg',
'.m4v', '.3gp', '.webm', '.ogg'
}
# 视频格式
frames = []
os.makedirs(img_save_path, exist_ok=True)
if ext in video_extensions:
print('输入视频文件格式为', ext)
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"无法打开视频文件: {video_path}")
# 获取视频的总帧数和帧率
# total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
video_fps = cap.get(cv2.CAP_PROP_FPS)
# 计算要提取的帧间隔
interval = max(1, int(video_fps / fps))
frame_number = 0
print('正在提取帧...')
while True:
ret, frame = cap.read()
if not ret:
break
if frame_number % interval == 0:
frames.append(frame)
frame_number += 1
cap.release()
print('正在保存图像...')
# 获取文件名(除去扩展名)
filename = os.path.basename(video_path).split('.')[0]
for i, frame in enumerate(frames):
img_id = ''.join([comp for k in range(len(str(len(frames))) - len(str(i+1)))]) + str(i+1)
cv2.imwrite(os.path.join(img_save_path, f'{filename}-{img_id}.jpg'), frame)
print(f'共输出 {len(frames)} 张图像')
def draw_camera_o3d(c2w, cam_width=0.32/2, cam_height=0.24/2, f=0.10, color=[0, 1, 0], show_axis=True):
points = [[0, 0, 0], [-cam_width, -cam_height, f], [cam_width, -cam_height, f],
[cam_width, cam_height, f], [-cam_width, cam_height, f]]
lines = [[0, 1], [0, 2], [0, 3], [0, 4], [1, 2], [2, 3], [3, 4], [4, 1]]
colors = [color for i in range(len(lines))]
line_set = o3d.geometry.LineSet()
line_set.points = o3d.utility.Vector3dVector(points)
line_set.lines = o3d.utility.Vector2iVector(lines)
line_set.colors = o3d.utility.Vector3dVector(colors)
line_set.transform(c2w)
if show_axis:
axis = o3d.geometry.TriangleMesh.create_coordinate_frame()
axis.scale(min(cam_width, cam_height), np.array([0., 0., 0.]))
axis.transform(c2w)
return [line_set, axis]
else:
return [line_set]
def draw_camera_tmsh(c2w, img=None, cam_width=0.32/2, cam_height=0.24/2, f=0.10, img_rate=0.1):
points = [[0, 0, 0], [-cam_width, -cam_height, f], [cam_width, -cam_height, f],
[cam_width, cam_height, f], [-cam_width, cam_height, f]]
for i, p in enumerate(points):
p = np.array(p)
p = c2w[:3, :3] @ p + c2w[:3, 3]
points[i] = p.tolist()
line_segments = [
np.array([points[0], points[1]]),
np.array([points[0], points[2]]),
np.array([points[0], points[3]]),
np.array([points[0], points[4]]),
np.array([points[1], points[2]]),
np.array([points[2], points[3]]),
np.array([points[3], points[4]]),
np.array([points[4], points[1]]),
]
if img is not None:
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (int(img_rate*img.shape[1]), int(img_rate*img.shape[0])))
R_yz = np.array([
[1, 0, 0],
[0, -1, 0],
[0, 0, -1]
])
points = [[0, 0, 0], [-cam_width, -cam_height, -f], [cam_width, -cam_height, -f],
[cam_width, cam_height, -f], [-cam_width, cam_height, -f]]
for i, p in enumerate(points):
p = np.array(p)
p = c2w[:3, :3] @ R_yz @ p + c2w[:3, 3]
points[i] = p.tolist()
uv = np.array([
[0, 0], # 顶点 0 的纹理坐标
[1, 0], # 顶点 1 的纹理坐标
[1, 1], # 顶点 2 的纹理坐标
[0, 1] # 顶点 3 的纹理坐标
])
texture = trimesh.visual.texture.TextureVisuals(image=Image.fromarray(img.astype(np.uint8)), uv=uv)
plane = trimesh.Trimesh(vertices=[points[1], points[2], points[3], points[4]],
faces=[[0, 1, 2], [2, 3, 0]], visual=texture)
else:
plane = None
return line_segments, plane
def quaternion_to_rotation_matrix(qw, qx, qy, qz):
"""
四元数转旋转矩阵
"""
norm = np.sqrt(qw**2 + qx**2 + qy**2 + qz**2)
qw, qx, qy, qz = qw/norm, qx/norm, qy/norm, qz/norm
rotation_matrix = np.array([
[1 - 2 * qy * qy - 2 * qz * qz, 2 * qx * qy - 2 * qz * qw, 2 * qx * qz + 2 * qy * qw],
[2 * qx * qy + 2 * qz * qw, 1 - 2 * qx * qx - 2 * qz * qz, 2 * qy * qz - 2 * qx * qw],
[2 * qx * qz - 2 * qy * qw, 2 * qy * qz + 2 * qx * qw, 1 - 2 * qx * qx - 2 * qy * qy]
])
return rotation_matrix
def run_command(command, info=True):
"""
在终端中运行命令
"""
try:
if info:
print(f"Running command: {' '.join(command)}")
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 将 stderr 重定向到 stdout
text=True, # 以文本模式处理输出
bufsize=1, # 行缓冲
universal_newlines=True # 兼容旧版本 Python
)
# 实时读取子进程的输出
with process.stdout:
for line in iter(process.stdout.readline, ''):
print(line, end='') # 实时打印输出
# 等待子进程结束并获取返回码
process.wait()
# return_code = process.returncode
return True
except subprocess.CalledProcessError as e:
if info:
print(f"Error running command: {' '.join(command)}")
print(e.stderr)
return False
def get_poses_colmap(poses_path):
with open(poses_path, 'r') as f:
lines = f.readlines()
poses = [lines[i] for i in range(4, len(lines), 2)]
points2D = [lines[i] for i in range(5, len(lines), 2)]
# for pts2D in points2D:
# print(len(pts2D.split()))
poses_all = {}
for p in poses:
parts = p.split()
image_id = int(parts[0])
qw, qx, qy, qz = map(float, parts[1:5]) # 四元数
tx, ty, tz = map(float, parts[5:8]) # 平移向量
image_name = parts[9] # 图像名称
# 四元数转旋转矩阵
rotation_matrix = quaternion_to_rotation_matrix(qw, qx, qy, qz)
rotation_matrix = np.linalg.inv(rotation_matrix)
# 构建4x4齐次变换矩阵
transformation_matrix = np.eye(4)
transformation_matrix[:3, :3] = rotation_matrix
transformation_matrix[:3, 3] = - rotation_matrix @ np.array([tx, ty, tz])
# poses_all.append(transformation_matrix)
poses_all[image_name] = transformation_matrix
return poses_all
def get_camera_colmap(cameras_path):
with open(cameras_path, 'r') as f:
ca = f.readlines()[3].split()
# print(ca)
if ca[1] == 'PINHOLE':
fx = float(ca[4])
fy = float(ca[5])
cx = float(ca[6])
cy = float(ca[7])
elif ca[1] == 'SIMPLE_PINHOLE':
fx = float(ca[4])
fy = fx
cx = float(ca[5])
cy = float(ca[6])
# 创建3x3矩阵
K = np.array([
[fx, 0, cx],
[0, fy, cy],
[0, 0, 1]
])
return K
def average_nearest_distance(vectors):
"""
计算每个向量与其他向量之间的最近距离,并返回这些距离的平均值。
:param vectors: List of np.array,每个元素是一个三维向量
:return: 平均最近距离的浮点数
"""
if not vectors:
raise ValueError("向量列表为空。")
# 将列表转换为二维 NumPy 数组,形状为 (n, 3)
vectors_array = np.array(vectors) # 形状: (n, 3)
n = vectors_array.shape[0]
if n < 2:
raise ValueError("向量列表中至少需要包含两个向量。")
# 计算所有向量对之间的欧几里得距离
# 使用广播机制计算距离矩阵
# vectors_array[:, np.newaxis, :] 的形状为 (n, 1, 3)
# vectors_array[np.newaxis, :, :] 的形状为 (1, n, 3)
# 差值的形状为 (n, n, 3)
diff = vectors_array[:, np.newaxis, :] - vectors_array[np.newaxis, :, :] # 形状: (n, n, 3)
distance_matrix = np.linalg.norm(diff, axis=2) # 形状: (n, n)
# 排除对角线元素(自身与自身的距离)
np.fill_diagonal(distance_matrix, np.inf) # 将对角线元素设为无穷大
# 找到每个向量的最近距离
min_distances = np.min(distance_matrix, axis=1) # 形状: (n,)
# 计算平均最近距离
average_distance = np.mean(min_distances)
return average_distance
class TimerInfo:
def __init__(self):
self.t_origin = time.time()
self.t_now = self.t_origin
def update(self, info):
del_t = time.time() - self.t_now
del_t_all = time.time() - self.t_origin
self.t_now = time.time()
print(f'{info} 用时{self.time2str(del_t)} 累计用时{self.time2str(del_t_all)}')
def time2str(self, t):
if t < 0.1:
return f'{round(t*1000)} ms'
else:
hour_t = int(t) // 3600
min_t = int(t) // 60 - hour_t * 60
sec_t = t - min_t * 60 - hour_t * 3600
res = ''
if hour_t:
res += f' {hour_t}h'
if min_t:
res += f' {min_t}min'
if sec_t:
res += f' {round(sec_t, 1)}s'
return res
class Colmap_OpenMVS_Reconstruction:
def __init__(self, workspace, openMVS_path='openMVS'):
self.workspace = os.path.abspath(workspace)
os.makedirs(workspace, exist_ok=True)
self.cam_ids = []
self.openmvs_bin = os.path.abspath(openMVS_path)
self.ti = TimerInfo()
self.load_flag = False
self.mvsflag = True
# 检查colmap是否已经安装
self.colmap_flag = run_command('colmap')
if self.colmap_flag:
print('成功检测到安装好的colmap!')
else:
print('未检测到colmap')
# 检查 openmvs 路径有效性
if not os.path.isdir(openMVS_path):
print(f"错误:openmvs 目录 '{openMVS_path}' 不存在。")
self.mvsflag = False
# exit(1)
else:
missing_files = []
for file in ['InterfaceCOLMAP', 'DensifyPointCloud', 'ReconstructMesh', 'RefineMesh', 'TextureMesh']:
if not os.path.isfile(os.path.join(openMVS_path, file)):
missing_files.append(file)
if missing_files:
print('openmvs 文件不完整!')
self.mvsflag = False
else:
print('openmvs 路径检查成功!')
def reconstruction(self, file_path, fps=1, cam_id=0, info=True, sparse=True, dense=True, mesh=True):
self.ti.__init__()
self.load_flag = False
img_path = self.get_dataset(file_path, fps)
self.ti.update('数据集读取完毕')
# sparse = sparse and self.colmap_flag
# dense = dense and sparse and self.mvsflag
# mesh = mesh and dense
# 稀疏建图
if sparse:
# 特征提取和稀疏重建
print('正在特征提取...')
feature_extract_cmd = [
"colmap", "feature_extractor",
"--database_path", os.path.join(self.workspace, 'database.db'),
"--image_path", os.path.abspath(img_path),
"--ImageReader.camera_model", "PINHOLE",
"--ImageReader.single_camera", "1"
]
run_command(feature_extract_cmd, info)
self.ti.update('特征提取完毕')
print('正在特征匹配...')
exhaustive_match_cmd = [
"colmap", "exhaustive_matcher",
"--database_path", os.path.join(self.workspace, 'database.db')
]
run_command(exhaustive_match_cmd, info)
self.ti.update('特征匹配完毕')
print('正在稀疏重建...')
os.makedirs(f'{self.workspace}/reconstruction', exist_ok=True)
mapper_command = [
"colmap", "mapper",
"--database_path", os.path.join(self.workspace, 'database.db'),
"--image_path", os.path.abspath(img_path),
"--output_path", os.path.join(self.workspace, 'reconstruction'),
]
run_command(mapper_command, info)
self.ti.update('稀疏重建完毕')
# 获得相机编号
self.cam_ids = []
reconstruction_path = os.path.join(self.workspace, 'reconstruction')
entries = os.listdir(reconstruction_path)
directories = [entry for entry in entries if os.path.isdir(os.path.join(reconstruction_path, entry))]
for directory in directories:
if directory.isdigit():
self.cam_ids.append(int(directory))
# print()
print('已经获取到的相机编号:', self.cam_ids)
if len(self.cam_ids) == 1:
print('数据集质量验证通过!只生成了单一完整视角轨迹')
else:
print('数据集质量过低或导入了多视角数据集,可以尝试增大fps增加数据集质量')
return
if not cam_id in self.cam_ids:
print('所需相机编号不存在!')
return
# 导出相机位姿和稀疏点云
print('正在导出相机位姿和稀疏点云...')
export_camera_cmd = [
"colmap", "model_converter",
"--input_path", os.path.join(self.workspace, f'reconstruction/{cam_id}'),
"--output_path", os.path.join(self.workspace, f'reconstruction/{cam_id}'),
"--output_type", "TXT"
]
run_command(export_camera_cmd, info)
export_sparse_map = [
"colmap", "model_converter",
"--input_path", os.path.join(self.workspace, f'reconstruction/{cam_id}'),
"--output_path", os.path.join(self.workspace, 'sparse.ply'),
"--output_type", "PLY"
]
run_command(export_sparse_map, info)
self.ti.update('相机位姿和稀疏点云导出完毕')
self.clear_ext(os.path.join(self.workspace, f'reconstruction/{cam_id}'), 'bin')
self.poses_all = get_poses_colmap(os.path.join(self.workspace, f'reconstruction/{cam_id}/images.txt'))
self.K = get_camera_colmap(os.path.join(self.workspace, f'reconstruction/{cam_id}/cameras.txt'))
# 稠密建图准备
if os.path.exists(os.path.join(self.openmvs_bin, 'images')):
shutil.rmtree(os.path.join(self.openmvs_bin, 'images'))
shutil.copytree(os.path.join(self.workspace, 'images'), os.path.join(self.openmvs_bin, 'images'))
os.makedirs(os.path.join(self.workspace, 'sparse'), exist_ok=True)
shutil.copyfile(os.path.join(self.workspace, f'reconstruction/{cam_id}/images.txt'),
os.path.join(self.workspace, f'sparse/images.txt'))
shutil.copyfile(os.path.join(self.workspace, f'reconstruction/{cam_id}/cameras.txt'),
os.path.join(self.workspace, f'sparse/cameras.txt'))
shutil.copyfile(os.path.join(self.workspace, f'reconstruction/{cam_id}/points3D.txt'),
os.path.join(self.workspace, f'sparse/points3D.txt'))
orgin_dir = os.getcwd()
# 稠密建图
if dense:
os.chdir(self.openmvs_bin)
# 转换为mvs格式
os.makedirs(f'{self.workspace}/dense', exist_ok=True)
print('正在把colmap格式转换为mvs格式...')
get_mvs_cmd = [os.path.join(self.openmvs_bin, "InterfaceCOLMAP"),
"-i", self.workspace,
"-o", os.path.join(self.workspace, 'dense/sparse.mvs')]
run_command(get_mvs_cmd, info)
self.ti.update('mvs格式转换完毕')
print('正在稠密化点云...')
densify_cmd = [os.path.join(self.openmvs_bin, "DensifyPointCloud"),
'-w', os.path.join(self.workspace, 'dense/'),
'-i', os.path.join(self.workspace, 'dense/sparse.mvs'),
'-o', os.path.join(self.workspace, 'dense/sparse_dense.mvs')
]
run_command(densify_cmd, info)
shutil.copyfile(os.path.join(self.workspace, 'dense/sparse_dense.ply'),
os.path.join(self.workspace, 'dense.ply'))
self.ti.update('点云稠密化完毕')
os.chdir(orgin_dir)
# 网格化
if mesh:
os.chdir(self.openmvs_bin)
print('正在网格重建...')
reconstruct_cmd = [os.path.join(self.openmvs_bin, "ReconstructMesh"),
'-w', os.path.join(self.workspace, 'dense/'),
'-i', os.path.join(self.workspace, "dense/sparse_dense.mvs"),
'-o', os.path.join(self.workspace, "dense/sparse_dense.mvs"),
]
run_command(reconstruct_cmd, info)
self.ti.update('网格重建完毕')
print('正在网格优化...')
refine_cmd = [os.path.join(self.openmvs_bin, "RefineMesh"),
'-w', os.path.join(self.workspace, 'dense/'),
'-i', os.path.join(self.workspace, "dense/sparse_dense.mvs"),
'-o', os.path.join(self.workspace, 'dense/sparse_dense.mvs')]
run_command(refine_cmd, info)
self.ti.update('网格优化完毕')
print('正在纹理贴图...')
texture_cmd = [os.path.join(self.openmvs_bin, "TextureMesh"),
'-w', os.path.join(self.workspace, 'dense/'),
'-i', os.path.join(self.workspace, "dense/sparse_dense.mvs"),
'-o', os.path.join(self.workspace, 'dense/sparse_dense.mvs')]
run_command(texture_cmd, info)
mesh0 = trimesh.load(os.path.join(self.workspace, 'dense/sparse_dense.ply'))
os.makedirs(os.path.join(self.workspace, 'mesh/'), exist_ok=True)
mesh0.export(os.path.join(self.workspace, 'mesh/mesh.obj'))
self.ti.update('纹理贴图完毕')
os.chdir(orgin_dir)
# 清理冗余文件
self.clear_ext(self.openmvs_bin, 'log')
self.clear_ext(self.openmvs_bin, 'dmap')
self.clear_ext(self.workspace, 'log')
self.clear_ext(os.path.join(self.workspace), 'db')
if os.path.exists(os.path.join(self.openmvs_bin, 'images')):
shutil.rmtree(os.path.join(self.openmvs_bin, 'images'))
if os.path.exists(os.path.join(self.workspace, 'dense')):
shutil.rmtree(os.path.join(self.workspace, 'dense'))
if os.path.exists(os.path.join(self.workspace, 'sparse')):
shutil.rmtree(os.path.join(self.workspace, 'sparse'))
def get_dataset(self, file_path, fps):
# 清理之前存在的项目文件
if os.path.exists(os.path.join(self.workspace, 'images')):
shutil.rmtree(os.path.join(self.workspace, 'images'))
print('清除历史图像文件')
print('正在读取数据集...')
if os.path.isdir(file_path):
print('输入路径为图片文件夹,自动提取图像文件')
img_path = file_path
os.makedirs(os.path.join(self.workspace, 'images'), exist_ok=True)
extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.tif', '.raw']
for filename in os.listdir(file_path):
file_ext = os.path.splitext(filename)[1].lower()
if file_ext in extensions:
source_file = os.path.join(file_path, filename)
destination_file = os.path.join(self.workspace, 'images', filename)
shutil.copy2(source_file, destination_file)
else:
video2img(file_path, os.path.join(self.workspace, 'images'), fps)
img_path = os.path.join(self.workspace, 'images')
return img_path
def clear_ext(self, filedir, ext):
path = Path(filedir)
log_files = list(path.rglob(f"*.{ext}"))
for file in log_files:
file.unlink()
def load(self, cam_id=0):
# 导入相机参数
try:
self.poses_all = get_poses_colmap(os.path.join(self.workspace, f'reconstruction/{cam_id}/images.txt'))
print('camera poses loaded!')
self.K = get_camera_colmap(os.path.join(self.workspace, f'reconstruction/{cam_id}/cameras.txt'))
print('camera intrinsic prams loaded!')
print(self.K)
except:
pass
# 读取图像
try:
image_pose_all = []
for image_name, pose in self.poses_all.items():
if os.path.exists(os.path.join(self.workspace, 'images', image_name)):
img = cv2.imread(os.path.join(self.workspace, 'images', image_name))
else:
img = None
image_pose = {
'name':image_name,
'image':img,
'pose':pose
}
# print('load image and pose :', image_name)
image_pose_all.append(image_pose)
self.image_pose_all = image_pose_all
# 求出相机的合适大小
avgd = average_nearest_distance([imgps['pose'][:3, 3] for imgps in self.image_pose_all])
H, W = self.image_pose_all[0]['image'].shape[:2]
# print(H, W)
self.cam_width, self.cam_height = avgd, avgd / W * H
self.cam_f = avgd / W * (self.K[0, 0] + self.K[1, 1]) / 2
except:
pass
# 导入稀疏点云
try:
pcd = o3d.io.read_point_cloud(os.path.join(self.workspace, 'sparse.ply'))
print('sparse point cloud loaded!')
self.sparse_pcd = pcd
except:
pass
# 导入稠密点云
try:
pcd = o3d.io.read_point_cloud(os.path.join(self.workspace, 'dense.ply'))
print('dense point cloud loaded!')
self.dense_pcd = pcd
except:
pass
def show_pcd(self, mode='sparse', interval_rate = 0.4, show_pose=True):
if not self.load_flag:
self.load()
self.load_flag = True
try:
if mode == 'sparse':
pcd = self.sparse_pcd
elif mode == 'dense':
pcd = self.dense_pcd
except:
return
vis = o3d.visualization.VisualizerWithKeyCallback()
vis.create_window(width=1920, height=1080)
vis.add_geometry(pcd)
if show_pose:
for image_name, pose in self.poses_all.items():
for geometry in draw_camera_o3d(pose, self.cam_width * interval_rate, self.cam_height * interval_rate, self.cam_f * interval_rate):
vis.add_geometry(geometry)
opt = vis.get_render_option()
opt.point_size = 2
opt.background_color = np.array([0, 0, 0])
while vis.poll_events():
vis.update_renderer()
vis.destroy_window()
def show_mesh(self, interval_rate=0.4, show_pose=True):
if not self.load_flag:
self.load()
self.load_flag = True
mesh = trimesh.load(os.path.join(self.workspace, 'mesh/mesh.obj'))
scene = trimesh.Scene()
scene.add_geometry(mesh)
if show_pose:
all_lines = []
all_img_planes = []
for p in self.image_pose_all:
line_segments, plane = draw_camera_tmsh(p['pose'], p['image'], self.cam_width * interval_rate, self.cam_height * interval_rate, self.cam_f * interval_rate)
all_lines = all_lines + line_segments
if plane is not None:
all_img_planes.append(plane)
cameras = trimesh.load_path(all_lines)
line_color = np.array([0, 255, 0, 255]) # 红色,RGBA
cameras.colors = np.tile(line_color, (len(cameras.entities), 1))
scene.add_geometry(cameras)
if all_img_planes:
scene.add_geometry(all_img_planes)
# scene.camera.background_color = [0, 0, 0, 0]
print('快捷键 w: 是否仅显示网格')
print('快捷键 c: 是否双面显示材质')
scene.show()
def export_NeuralRecon_data(self, output_path, scene_name, output_shape = (640, 480)):
if not self.load_flag:
self.load()
self.load_flag = True
os.makedirs(os.path.join(output_path, 'images'), exist_ok=True)
os.makedirs(os.path.join(output_path, 'poses'), exist_ok=True)
os.makedirs(os.path.join(output_path, 'intrinsics'), exist_ok=True)
K_now = self.K.copy()
K_now[0, 0] *= output_shape[0] / K_now[0, 2] / 2
K_now[1, 1] *= output_shape[1] / K_now[1, 2] / 2
K_now[0, 2] *= output_shape[0] / 2
K_now[1, 2] *= output_shape[1] / 2
# 初始化片段信息列表
fragments = []
# 遍历每个图像-位姿对
for idx, data in enumerate(self.image_pose_all):
image_name = data.get('name', f"{idx}.jpg")
image = data.get('image')
pose = data.get('pose')
if image is None:
raise ValueError(f"Image data is missing for entry with name '{image_name}'.")
if pose is None:
raise ValueError(f"Pose data is missing for entry with name '{image_name}'.")
# 保存图像
img_id = ''.join(['0' for k in range(len(str(len(self.image_pose_all))) - len(str(idx)))]) + str(idx)
# img_id = str(idx)
image_path = os.path.join(os.path.join(output_path, 'images'), f'{img_id}.jpg')
pose_path = os.path.join(os.path.join(output_path, 'poses'), f'{img_id}.txt')
intrinsic_path = os.path.join(os.path.join(output_path, 'intrinsics'), f'{img_id}.txt')
cv2.imwrite(image_path, image)
np.savetxt(pose_path, pose, delimiter=' ', comments='')
np.savetxt(intrinsic_path, self.K, delimiter=' ', comments='')
fragments.append({
'scene': scene_name,
'fragment_id': 0,
'image_ids': idx,
'extrinsics': pose, # 假设 pose 是 4x4 矩阵
'intrinsics': self.K # 假设所有图像使用相同的内参 K
})
# 保存 fragments.pkl
fragments_pkl_path = os.path.join(output_path, 'fragments.pkl')
with open(fragments_pkl_path, 'wb') as f:
pickle.dump(fragments, f)
print(f"Saved fragments data to: {fragments_pkl_path}")
# 保存 fragments.pkl
fragments_pkl_path = os.path.join(output_path, 'fragments.pkl')
with open(fragments_pkl_path, 'wb') as f:
pickle.dump(fragments, f)
print(f"Saved fragments data to: {fragments_pkl_path}")
print("Data saving completed successfully.")
process_data(output_path)
def process_data(data_path, data_source='ARKit', window_size=9, min_angle=15, min_distance=0.1):
"""
处理已存在的图像、相机内参和相机外参数据,并生成 fragments.pkl 文件。
使用图像文件名(去除扩展名)作为标签。
参数:
- data_path (str): 数据所在的根目录路径。
- data_source (str): 数据来源,默认为 'ARKit'。
- window_size (int): 窗口大小,用于选择关键帧,默认为9。
- min_angle (float): 最小角度阈值(度),用于选择关键帧,默认为15。
- min_distance (float): 最小距离阈值(米),用于选择关键帧,默认为0.1。
"""
# 加载相机内参和相机外参
print('加载相机内参和相机外参...')
# 假设 poses 和 intrinsics 文件夹中每个文件对应一个图像文件(去除扩展名)
def load_data(folder, data_type):
data_dict = {}
# 获取所有图像文件名(去除扩展名)
image_filenames = [
os.path.splitext(filename)[0]
for filename in os.listdir(os.path.join(data_path, 'images'))
if filename.endswith(('.jpg', '.png', '.jpeg', '.bmp', '.tiff'))
]
for filename in sorted(image_filenames):
file_path = os.path.join(data_path, folder, f"{filename}.txt")
if not os.path.exists(file_path):
print(f"警告: 未找到文件 '{file_path}',跳过该图像。")
continue
try:
id = filename # 使用图像文件名作为 ID
if data_type == 'intrinsic':
data = np.loadtxt(file_path, delimiter=' ')
data_dict[id] = {'K': data}
elif data_type == 'pose':
data = np.loadtxt(file_path, delimiter=' ')
if data.shape != (4, 4):
raise ValueError("外参矩阵必须是 4x4 的齐次变换矩阵。")
data_dict[id] = data
except Exception as e:
print(f"错误: 加载文件 '{file_path}' 时出错: {e}")
continue
return data_dict
cam_intrinsic_dict = load_data('intrinsics', 'intrinsic')
cam_pose_dict = load_data('poses', 'pose')
if not cam_intrinsic_dict:
raise ValueError("未找到有效的相机内参数据。请检查 'intrinsics' 文件夹中的文件。")
if not cam_pose_dict:
raise ValueError("未找到有效的相机外参数据。请检查 'poses' 文件夹中的文件。")
# 对相机内参进行缩放(如果需要)
# 如果原始图像大小与当前大小不同,需要调整内参
# 这里假设当前大小为 size=(640, 480)
# size = (640, 480)
# ori_size = (1920, 1440) # 原始图像大小,根据实际情况调整
# for k, v in tqdm(cam_intrinsic_dict.items(), desc='处理相机内参...'):
# v['K'][0, :] /= (ori_size[0] / size[0])
# v['K'][1, :] /= (ori_size[1] / size[1])
# 选择关键帧
print('选择关键帧...')
fragments = []
all_ids = []
ids = []
count = 0
last_pose = None
# 获取所有图像文件名(去除扩展名),用于遍历
image_filenames = [
os.path.splitext(filename)[0]
for filename in os.listdir(os.path.join(data_path, 'images'))
if filename.endswith(('.jpg', '.png', '.jpeg', '.bmp', '.tiff'))
]
for filename in tqdm(sorted(image_filenames), desc='关键帧选择...'):
cam_intrinsic = cam_intrinsic_dict.get(filename)
cam_pose = cam_pose_dict.get(filename)
if cam_intrinsic is None or cam_pose is None:
print(f"警告: 未找到 ID 为 '{filename}' 的相机内参或外参,跳过该帧。")
continue
if count == 0:
ids.append(filename)
last_pose = cam_pose
count += 1
else:
# 计算旋转角度
rotation_diff = np.dot(np.linalg.inv(last_pose[:3, :3]), cam_pose[:3, :3])
angle_rad = np.arccos(
np.clip(
(np.trace(rotation_diff) - 1) / 2,
-1.0,
1.0
)
)
angle_deg = np.degrees(angle_rad)
# 计算平移距离
dis = np.linalg.norm(cam_pose[:3, 3] - last_pose[:3, 3])
if angle_deg > min_angle or dis > min_distance:
ids.append(filename)
last_pose = cam_pose
count += 1
if count == window_size:
all_ids.append(ids)
ids = []
count = 0
# 处理剩余的 ids
# if ids:
# for id0 in ids:
# all_ids.append([id0])
# 保存 fragments
print('保存 fragments 文件...')
for i, ids in enumerate(tqdm(all_ids, desc='保存 fragments...')):
poses = []
intrinsics = []
for id in ids:
# 根据需要调整相机外参,例如在 ARKit 坐标系中移动 X-Y 平面
# 这里假设需要将 Z 轴向上移动 1.5 米
cam_pose_dict[id][2, 3] += 1.5
poses.append(cam_pose_dict[id])
intrinsics.append(cam_intrinsic_dict[id]['K'])
print(ids)
fragments.append({
'scene': os.path.basename(os.path.normpath(data_path)),
'fragment_id': i,
'image_ids': ids, # 这里存储的是字符串标签
'extrinsics': poses,
'intrinsics': intrinsics
})
# 保存为 pkl 文件
pkl_path = os.path.join(data_path, 'fragments.pkl')
with open(pkl_path, 'wb') as f:
pickle.dump(fragments, f)
print(f"fragments.pkl 文件已保存到 '{pkl_path}'。")
if __name__ == "__main__":
clmp = Colmap_OpenMVS_Reconstruction(workspace='../colmap_mvs_result/market')
clmp.reconstruction(file_path='example_data/table.mp4', fps=5)
clmp.show_pcd(mode='dense')
# clmp.show_mesh(show_pose=True)
clmp.export_NeuralRecon_data('./marketdata', 'markettable')

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)