-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
timestamp plots, towards synchronization (maybe not needed)
- Loading branch information
bs_lab
committed
Oct 8, 2024
1 parent
32aeddb
commit 679cecc
Showing
10 changed files
with
523 additions
and
56 deletions.
There are no files selected for viewing
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
import logging | ||
import time | ||
from pathlib import Path | ||
from typing import Dict, List, Union | ||
|
||
import numpy as np | ||
from pydantic import BaseModel | ||
from scipy.stats import median_abs_deviation | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class TimestampDiagnosticsDataClass(BaseModel): | ||
mean_framerates_per_camera: dict | ||
standard_deviation_framerates_per_camera: dict | ||
median_framerates_per_camera: dict | ||
median_absolute_deviation_per_camera: dict | ||
mean_mean_framerate: float | ||
mean_standard_deviation_framerates: float | ||
mean_median_framerates: float | ||
mean_median_absolute_deviation_per_camera: float | ||
|
||
|
||
def create_timestamp_diagnostic_plots( | ||
timestamp_dictionary: Dict[int, np.ndarray], | ||
path_to_save_plots_png: Union[str, Path], | ||
): | ||
"""plot some diagnostics to assess quality of camera sync""" | ||
|
||
# opportunistic load of matplotlib to avoid startup time costs | ||
from matplotlib import pyplot as plt | ||
|
||
plt.set_loglevel("warning") | ||
|
||
for timestamp_array in timestamp_dictionary.values(): | ||
timestamp_array /= 1e9 | ||
|
||
max_frame_duration = 0.1 | ||
fig = plt.figure(figsize=(18, 12)) | ||
session_name = Path(path_to_save_plots_png).parent.parent.parent.stem | ||
recording_name = Path(path_to_save_plots_png).parent.parent.stem | ||
fig.suptitle(f"Timestamps of synchronized frames\nsession: {session_name}, recording: {recording_name}") | ||
|
||
ax1 = plt.subplot( | ||
231, | ||
title="(Raw) Camera Frame Timestamp vs Frame#\n(Lines should have same slope)", | ||
xlabel="Frame#", | ||
ylabel="Timestamp (sec)", | ||
) | ||
ax2 = plt.subplot( | ||
232, | ||
ylim=(0, max_frame_duration), | ||
title="(Raw) Camera Frame Duration Trace", | ||
xlabel="Frame#", | ||
ylabel="Duration (sec)", | ||
) | ||
ax3 = plt.subplot( | ||
233, | ||
xlim=(0, max_frame_duration), | ||
title="(Raw) Camera Frame Duration Histogram (count)", | ||
xlabel="Duration(s, 1ms bins)", | ||
ylabel="Probability", | ||
) | ||
ax4 = plt.subplot( | ||
234, | ||
title="(Synchronized) Camera Frame Timestamp vs Frame#\n(Lines should be on top of each other)", | ||
xlabel="Frame#", | ||
ylabel="Timestamp (sec)", | ||
) | ||
ax5 = plt.subplot( | ||
235, | ||
ylim=(0, max_frame_duration), | ||
title="(Synchronized) Camera Frame Duration Trace", | ||
xlabel="Frame#", | ||
ylabel="Duration (sec)", | ||
) | ||
ax6 = plt.subplot( | ||
236, | ||
xlim=(0, max_frame_duration), | ||
title="(Synchronized) Camera Frame Duration Histogram (count)", | ||
xlabel="Duration(s, 1ms bins)", | ||
ylabel="Probability", | ||
) | ||
|
||
for camera_id, timestamps in timestamp_dictionary.items(): | ||
ax1.plot(timestamps, label=f"Camera# {str(camera_id)}") | ||
ax1.legend() | ||
ax2.plot(np.diff(timestamps), ".") | ||
ax3.hist( | ||
np.diff(timestamps), | ||
bins=np.arange(0, max_frame_duration, 0.0025), | ||
alpha=0.5, | ||
) | ||
|
||
# for camera_id, timestamps in synchronized_timestamps_dictionary.items(): | ||
# ax4.plot(timestamps, label=f"Camera# {str(camera_id)}") | ||
# ax4.legend() | ||
# ax5.plot(np.diff(timestamps), ".") | ||
# ax6.hist( | ||
# np.diff(timestamps), | ||
# bins=np.arange(0, max_frame_duration, 0.0025), | ||
# alpha=0.5, | ||
# ) | ||
|
||
plt.tight_layout() | ||
|
||
fig_save_path = Path(path_to_save_plots_png) | ||
plt.savefig(str(fig_save_path)) | ||
logger.info(f"Saving diagnostic figure tp: {fig_save_path}") | ||
|
||
def timestamps_array_to_dictionary(timestamps_array: np.ndarray) -> Dict[int, np.ndarray]: | ||
return {i: timestamps_array[i, :] for i in range(timestamps_array.shape[0])} | ||
|
||
def calculate_camera_diagnostic_results( | ||
timestamps_dictionary, | ||
) -> TimestampDiagnosticsDataClass: | ||
mean_framerates_per_camera = {} | ||
standard_deviation_framerates_per_camera = {} | ||
median_framerates_per_camera = {} | ||
median_absolute_deviation_per_camera = {} | ||
|
||
for cam_id, timestamps in timestamps_dictionary.items(): | ||
timestamps_formatted = (np.asarray(timestamps) - timestamps[0]) / 1e9 | ||
frame_durations = np.diff(timestamps_formatted) | ||
framerate_per_frame = 1 / frame_durations | ||
mean_framerates_per_camera[cam_id] = np.nanmean(framerate_per_frame) | ||
median_framerates_per_camera[cam_id] = np.nanmedian(framerate_per_frame) | ||
standard_deviation_framerates_per_camera[cam_id] = np.nanstd( | ||
framerate_per_frame | ||
) | ||
median_absolute_deviation_per_camera[cam_id] = median_abs_deviation( | ||
framerate_per_frame | ||
) | ||
|
||
mean_mean_framerate = np.nanmean(list(mean_framerates_per_camera.values())) | ||
mean_standard_deviation_framerates = np.nanmean( | ||
list(standard_deviation_framerates_per_camera.values()) | ||
) | ||
mean_median_framerates = np.nanmean(list(median_framerates_per_camera.values())) | ||
mean_median_absolute_deviation_per_camera = np.nanmean( | ||
list(median_absolute_deviation_per_camera.values()) | ||
) | ||
|
||
return TimestampDiagnosticsDataClass( | ||
mean_framerates_per_camera=mean_framerates_per_camera, | ||
standard_deviation_framerates_per_camera=standard_deviation_framerates_per_camera, | ||
median_framerates_per_camera=median_framerates_per_camera, | ||
median_absolute_deviation_per_camera=median_absolute_deviation_per_camera, | ||
mean_mean_framerate=float(mean_mean_framerate), | ||
mean_standard_deviation_framerates=float(mean_standard_deviation_framerates), | ||
mean_median_framerates=float(mean_median_framerates), | ||
mean_median_absolute_deviation_per_camera=float( | ||
mean_median_absolute_deviation_per_camera | ||
), | ||
) |
Empty file.
131 changes: 131 additions & 0 deletions
131
Cameras/exploration_scripts/pylon_viewer_timestamp_synchronize.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
from datetime import datetime, timedelta | ||
from typing import Dict | ||
import cv2 | ||
|
||
from pathlib import Path | ||
|
||
|
||
class TimestampSynchronize: | ||
def __init__(self, folder_path: Path): | ||
self.raw_videos_path = folder_path / "raw_videos" | ||
self.synched_videos_path = folder_path / "synched_videos" | ||
|
||
self.synched_videos_path.mkdir(parents=True, exist_ok=True) | ||
|
||
def synchronize(self): | ||
self.setup() | ||
target_framecount = ( | ||
self.get_lowest_postoffset_frame_count() - 1 | ||
) # -1 accounts for rounding errors in calculating offset | ||
print(f"synchronizing videos to target framecount: {target_framecount}") | ||
for video_name, cap in self.capture_dict.items(): | ||
print(f"synchronizing: {video_name}") | ||
current_framecount = 0 | ||
offset = self.frame_offset_dict[video_name] | ||
while current_framecount < target_framecount: # < to account for 0 indexing | ||
ret, frame = cap.read() | ||
if not ret: | ||
raise ValueError(f"{video_name} has no more frames.") | ||
if offset <= 0: | ||
self.writer_dict[video_name].write(frame) | ||
current_framecount += 1 | ||
else: | ||
offset -= 1 | ||
self.close() | ||
print("Done synchronizing") | ||
|
||
def setup(self): | ||
print("Setting up for synchronization...") | ||
self.create_capture_dict() | ||
self.validate_fps() | ||
self.create_writer_dict() | ||
self.create_starting_timestamp_dict() | ||
self.create_frame_offset_dict() | ||
|
||
def create_capture_dict(self): | ||
self.capture_dict = { | ||
video_path.name: cv2.VideoCapture(str(video_path)) | ||
for video_path in self.raw_videos_path.iterdir() | ||
} | ||
|
||
def create_writer_dict(self): | ||
self.writer_dict = { | ||
video_name: cv2.VideoWriter( | ||
str(self.synched_videos_path / (video_name.split(".")[0] + ".mp4")), | ||
cv2.VideoWriter.fourcc(*"mp4v"), | ||
cap.get(cv2.CAP_PROP_FPS), | ||
( | ||
int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), | ||
int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), | ||
), | ||
) | ||
for video_name, cap in self.capture_dict.items() | ||
} | ||
|
||
def create_starting_timestamp_dict(self): | ||
self.starting_timestamp_dict = { | ||
video_name: datetime.fromisoformat(video_name.split(".")[0].split("__")[-1]) | ||
for video_name in self.capture_dict.keys() | ||
} | ||
|
||
def create_frame_offset_dict(self): | ||
latest_start = sorted(self.starting_timestamp_dict.values())[-1] | ||
frame_duration_seconds = 1 / self.fps | ||
|
||
self.frame_offset_dict: Dict[str, int] = {} | ||
|
||
for video_name, time in self.starting_timestamp_dict.items(): | ||
offset_microseconds = (latest_start - time) / timedelta(microseconds=1) | ||
offset_frames = round( | ||
timedelta(microseconds=offset_microseconds) | ||
/ timedelta(seconds=frame_duration_seconds) | ||
) | ||
print(f"{video_name} offset in microseconds: {offset_microseconds}") | ||
print(f"{video_name} offset in frames: {offset_frames}") | ||
print( | ||
f"{video_name} total frames: {int(self.capture_dict[video_name].get(cv2.CAP_PROP_FRAME_COUNT))}" | ||
) | ||
self.frame_offset_dict[video_name] = offset_frames | ||
|
||
def get_lowest_postoffset_frame_count(self) -> int: | ||
return int( | ||
min( | ||
cap.get(cv2.CAP_PROP_FRAME_COUNT) - self.frame_offset_dict[video_name] | ||
for video_name, cap in self.capture_dict.items() | ||
) | ||
) | ||
|
||
def validate_fps(self): | ||
fps = set(cap.get(cv2.CAP_PROP_FPS) for cap in self.capture_dict.values()) | ||
|
||
if len(fps) > 1: | ||
print(f"set of video fps: {fps}") | ||
raise ValueError("Not all videos have the same fps") | ||
|
||
self.fps = fps.pop() | ||
|
||
def close(self): | ||
print("Closing all capture objects and writers") | ||
self.release_captures() | ||
self.release_writers() | ||
|
||
def release_captures(self): | ||
for cap in self.capture_dict.values(): | ||
cap.release() | ||
|
||
def release_writers(self): | ||
for writer in self.writer_dict.values(): | ||
writer.release() | ||
|
||
|
||
if __name__ == "__main__": | ||
# folder_path = Path( | ||
# "/Users/philipqueen/Documents/Humon Research Lab/Basler Stuff/calibration_attempt/" | ||
# ) | ||
|
||
folder_path = Path( | ||
"/Users/philipqueen/Documents/Humon Research Lab/Basler Stuff/fabio_hand/" | ||
) | ||
|
||
timestamp_synchronize = TimestampSynchronize(folder_path) | ||
timestamp_synchronize.synchronize() |
Oops, something went wrong.