""" # -*- coding: utf-8 -*- ----------------------------------------------------------------------------------- # Author: Nguyen Mau Dung # DoC: 2020.08.17 # email: nguyenmaudung93.kstn@gmail.com ----------------------------------------------------------------------------------- # Description: This script for the KITTI dataset """ import sys import os import math from builtins import int import numpy as np from torch.utils.data import Dataset import cv2 import torch src_dir = os.path.dirname(os.path.realpath(__file__)) # while not src_dir.endswith("sfa"): # src_dir = os.path.dirname(src_dir) if src_dir not in sys.path: sys.path.append(src_dir) from data_process.kitti_data_utils import gen_hm_radius, compute_radius, Calibration, get_filtered_lidar from data_process.kitti_bev_utils import makeBEVMap, drawRotatedBox, get_corners from data_process import transformation import config.kitti_config as cnf class KittiDataset(Dataset): def __init__(self, configs, mode='train', lidar_aug=None, hflip_prob=None, num_samples=None): self.dataset_dir = configs.dataset_dir self.input_size = configs.input_size self.hm_size = configs.hm_size self.num_classes = configs.num_classes self.max_objects = configs.max_objects assert mode in ['train', 'val', 'test'], 'Invalid mode: {}'.format(mode) self.mode = mode self.is_test = (self.mode == 'test') # sub_folder = 'testing' if self.is_test else 'training' self.lidar_aug = lidar_aug self.hflip_prob = hflip_prob if mode == 'val': self.val_data_url = configs.val_data_url self.lidar_dir = os.path.join(self.val_data_url, "velodyne") self.calib_dir = os.path.join(self.val_data_url, "calib") self.label_dir = os.path.join(self.val_data_url, "label_2") # self.image_dir = os.path.join(self.dataset_dir, sub_folder, "image_2") else: self.lidar_dir = os.path.join(self.dataset_dir, "velodyne") self.calib_dir = os.path.join(self.dataset_dir, "calib") self.label_dir = os.path.join(self.dataset_dir, "label_2") # split_txt_path = os.path.join('../dataset/apollo/', 'ImageSets', '{}.txt'.format(mode)) sample_list = [] sample_files = os.listdir(self.lidar_dir) for bin_file in sample_files: bin_name = bin_file.split('.')[0] sample_list.append(bin_name) self.sample_id_list = sample_list if num_samples is not None: self.sample_id_list = self.sample_id_list[:num_samples] self.num_samples = len(self.sample_id_list) def __len__(self): return len(self.sample_id_list) def __getitem__(self, index): if self.is_test: return self.load_img_only(index) else: return self.load_img_with_targets(index) def load_img_only(self, index): """Load only image for the testing phase""" sample_id = self.sample_id_list[index] # print(sample_id) # img_path, img_rgb = self.get_image(sample_id) lidarData = self.get_lidar(sample_id) lidarData = get_filtered_lidar(lidarData, cnf.boundary) bev_map = makeBEVMap(lidarData, cnf.boundary) bev_map = torch.from_numpy(bev_map) bev_path = os.path.join(self.lidar_dir, '{}.png'.format(sample_id)) metadatas = { 'bev_path': bev_path, } # return metadatas, bev_map, img_rgb return bev_map,metadatas def load_img_with_targets(self, index): """Load images and targets for the training and validation phase""" sample_id = self.sample_id_list[index] # img_path = os.path.join(self.image_dir, '{}.png'.format(sample_id)) lidarData = self.get_lidar(sample_id) # calib = self.get_calib(sample_id) labels, has_labels = self.get_label(sample_id) # if has_labels: # labels[:, 1:] = transformation.camera_to_lidar_box(labels[:, 1:], calib.V2C, calib.R0, calib.P2) if self.lidar_aug: lidarData, labels[:, 1:] = self.lidar_aug(lidarData, labels[:, 1:]) lidarData, labels = get_filtered_lidar(lidarData, cnf.boundary, labels) bev_map = makeBEVMap(lidarData, cnf.boundary) bev_map = torch.from_numpy(bev_map) hflipped = False if np.random.random() < self.hflip_prob: hflipped = True # C, H, W bev_map = torch.flip(bev_map, [-1]) targets = self.build_targets(labels, hflipped) # metadatas = { # 'img_path': img_path, # 'hflipped': hflipped # } # return metadatas, bev_map, targets return bev_map, targets def get_image(self, idx): img_path = os.path.join(self.image_dir, '{}.png'.format(idx)) img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB) return img_path, img def get_calib(self, idx): calib_file = os.path.join(self.calib_dir, '{}.txt'.format(idx)) # assert os.path.isfile(calib_file) return Calibration(calib_file) def get_lidar(self, idx): lidar_file = os.path.join(self.lidar_dir, '{}.bin'.format(idx)) # assert os.path.isfile(lidar_file) return np.fromfile(lidar_file, dtype=np.float32).reshape(-1, 4) def get_label(self, idx): labels = [] label_path = os.path.join(self.label_dir, '{}.txt'.format(idx)) for line in open(label_path, 'r'): line = line.rstrip() line_parts = line.split(' ') obj_name = line_parts[0] # 'Car', 'Pedestrian', ... cat_id = int(cnf.CLASS_NAME_TO_ID[obj_name]) if cat_id <= -99: # ignore Tram and Misc continue truncated = int(float(line_parts[1])) # truncated pixel ratio [0..1] occluded = int(line_parts[2]) # 0=visible, 1=partly occluded, 2=fully occluded, 3=unknown alpha = float(line_parts[3]) # object observation angle [-pi..pi] # xmin, ymin, xmax, ymax # bbox = np.array([float(line_parts[4]), float(line_parts[5]), float(line_parts[6]), float(line_parts[7])]) # height, width, length (h, w, l) h, w, l = float(line_parts[8]), float(line_parts[9]), float(line_parts[10]) # location (x,y,z) in camera coord. x, y, z = float(line_parts[11]), float(line_parts[12]), float(line_parts[13]) ry = float(line_parts[14]) # yaw angle (around Y-axis in camera coordinates) [-pi..pi] object_label = [cat_id, x, y, z, h, w, l, ry] labels.append(object_label) if len(labels) == 0: labels = np.zeros((1, 8), dtype=np.float32) has_labels = False else: labels = np.array(labels, dtype=np.float32) has_labels = True return labels, has_labels def build_targets(self, labels, hflipped): minX = cnf.boundary['minX'] maxX = cnf.boundary['maxX'] minY = cnf.boundary['minY'] maxY = cnf.boundary['maxY'] minZ = cnf.boundary['minZ'] maxZ = cnf.boundary['maxZ'] num_objects = min(len(labels), self.max_objects) hm_l, hm_w = self.hm_size hm_main_center = np.zeros((self.num_classes, hm_l, hm_w), dtype=np.float32) cen_offset = np.zeros((self.max_objects, 2), dtype=np.float32) direction = np.zeros((self.max_objects, 2), dtype=np.float32) z_coor = np.zeros((self.max_objects, 1), dtype=np.float32) dimension = np.zeros((self.max_objects, 3), dtype=np.float32) indices_center = np.zeros((self.max_objects), dtype=np.int64) obj_mask = np.zeros((self.max_objects), dtype=np.uint8) for k in range(num_objects): cls_id, x, y, z, h, w, l, yaw = labels[k] cls_id = int(cls_id) # Invert yaw angle yaw = -yaw if not ((minX <= x <= maxX) and (minY <= y <= maxY) and (minZ <= z <= maxZ)): continue if (h <= 0) or (w <= 0) or (l <= 0): continue bbox_l = l / cnf.bound_size_x * hm_l bbox_w = w / cnf.bound_size_y * hm_w radius = compute_radius((math.ceil(bbox_l), math.ceil(bbox_w))) radius = max(0, int(radius)) center_y = (x - minX) / cnf.bound_size_x * hm_l # x --> y (invert to 2D image space) center_x = (y - minY) / cnf.bound_size_y * hm_w # y --> x center = np.array([center_x, center_y], dtype=np.float32) if hflipped: center[0] = hm_w - center[0] - 1 center_int = center.astype(np.int32) if cls_id < 0: ignore_ids = [_ for _ in range(self.num_classes)] if cls_id == - 1 else [- cls_id - 2] # Consider to make mask ignore for cls_ig in ignore_ids: gen_hm_radius(hm_main_center[cls_ig], center_int, radius) hm_main_center[ignore_ids, center_int[1], center_int[0]] = 0.9999 continue # Generate heatmaps for main center gen_hm_radius(hm_main_center[cls_id], center, radius) # Index of the center indices_center[k] = center_int[1] * hm_w + center_int[0] # targets for center offset cen_offset[k] = center - center_int # targets for dimension dimension[k, 0] = h dimension[k, 1] = w dimension[k, 2] = l # targets for direction direction[k, 0] = math.sin(float(yaw)) # im direction[k, 1] = math.cos(float(yaw)) # re # im -->> -im if hflipped: direction[k, 0] = - direction[k, 0] # targets for depth z_coor[k] = z - minZ # Generate object masks obj_mask[k] = 1 targets = { 'hm_cen': hm_main_center, 'cen_offset': cen_offset, 'direction': direction, 'z_coor': z_coor, 'dim': dimension, 'indices_center': indices_center, 'obj_mask': obj_mask, } return targets def draw_img_with_label(self, index): sample_id = self.sample_id_list[index] lidar_path = os.path.join(self.lidar_dir, '{}.bin'.format(sample_id)) lidarData = self.get_lidar(sample_id) calib = self.get_calib(sample_id) labels, has_labels = self.get_label(sample_id) print(lidar_path) if has_labels: labels[:, 1:] = transformation.camera_to_lidar_box(labels[:, 1:], calib.V2C, calib.R0, calib.P2) if self.lidar_aug: lidarData, labels[:, 1:] = self.lidar_aug(lidarData, labels[:, 1:]) lidarData, labels = get_filtered_lidar(lidarData, cnf.boundary, labels) bev_map = makeBEVMap(lidarData, cnf.boundary) print(labels) return bev_map, labels, lidar_path if __name__ == '__main__': from easydict import EasyDict as edict from data_process.transformation import OneOf, Random_Scaling, Random_Rotation, lidar_to_camera_box from utils.visualization_utils import merge_rgb_to_bev, show_rgb_image_with_boxes configs = edict() configs.distributed = False # For testing configs.pin_memory = False configs.num_samples = None configs.input_size = (1216, 608) configs.hm_size = (304, 152) configs.max_objects = 50 configs.num_classes = 3 configs.output_width = 608 # configs.dataset_dir = os.path.join('../../', 'dataset', 'kitti') # lidar_aug = OneOf([ # Random_Rotation(limit_angle=np.pi / 4, p=1.), # Random_Scaling(scaling_range=(0.95, 1.05), p=1.), # ], p=1.) lidar_aug = None dataset = KittiDataset(configs, mode='val', lidar_aug=lidar_aug, hflip_prob=0., num_samples=configs.num_samples) print('\n\nPress n to see the next sample >>> Press Esc to quit...') for idx in range(len(dataset)): bev_map, labels, lidar_path = dataset.draw_img_with_label(idx) calib = Calibration(lidar_path.replace(".bin", ".txt").replace("velodyne", "calib")) bev_map = (bev_map.transpose(1, 2, 0) * 255).astype(np.uint8) # bev_map = cv2.resize(bev_map, (cnf.BEV_HEIGHT, cnf.BEV_WIDTH)) print(bev_map.shape) for box_idx, (cls_id, x, y, z, h, w, l, yaw) in enumerate(labels): # Draw rotated box yaw = -yaw y1 = int((x - cnf.boundary['minX']) / cnf.DISCRETIZATION) x1 = int((y - cnf.boundary['minY']) / cnf.DISCRETIZATION) w1 = int(w / cnf.DISCRETIZATION) l1 = int(l / cnf.DISCRETIZATION) drawRotatedBox(bev_map, x1, y1, w1, l1, yaw, cnf.colors[int(cls_id)]) # Rotate the bev_map bev_map = cv2.rotate(bev_map, cv2.ROTATE_180) # labels[:, 1:] = lidar_to_camera_box(labels[:, 1:], calib.V2C, calib.R0, calib.P2) cv2.imshow('bev_map', bev_map) if cv2.waitKey(0) & 0xff == 27: break