""" # -*- coding: utf-8 -*- ----------------------------------------------------------------------------------- # Author: Nguyen Mau Dung # DoC: 2020.08.17 # email: nguyenmaudung93.kstn@gmail.com ----------------------------------------------------------------------------------- # Description: Testing script """ import argparse import sys import os import time import warnings warnings.filterwarnings("ignore", category=UserWarning) from easydict import EasyDict as edict import cv2 import torch import numpy as np import torch.nn.functional as F src_dir = os.path.dirname(os.path.realpath(__file__)) # while not src_dir.endswith("sfa"): # src_dir = os.path.dirname(src_dir) if src_dir not in sys.path: sys.path.append(src_dir) from data_process.kitti_dataloader import create_test_dataloader from models.model_utils import create_model import config.kitti_config as cnf def parse_test_configs(): parser = argparse.ArgumentParser(description='Testing config for the Implementation') parser.add_argument('--saved_fn', type=str, default='fpn_resnet_18', metavar='FN', help='The name using for saving logs, models,...') parser.add_argument('-a', '--arch', type=str, default='fpn_resnet_18', metavar='ARCH', help='The name of the model architecture') parser.add_argument('--model_dir', type=str, default='/train_out_model/', metavar='PATH', help='the path of the pretrained checkpoint') parser.add_argument('--K', type=int, default=50, help='the number of top K') parser.add_argument('--no_cuda', default= False, help='If true, cuda is not used.') parser.add_argument('--gpu_idx', default=0, type=int, help='GPU index to use.') parser.add_argument('--num_samples', type=int, default=None, help='Take a subset of the dataset to run and debug') parser.add_argument('--num_workers', type=int, default=1, help='Number of threads for loading data') parser.add_argument('--batch_size', type=int, default=1, help='mini-batch size (default: 4)') parser.add_argument('--peak_thresh', type=float, default=0.2) parser.add_argument('--dataset_dir', type=str,default='/dataset_dir/', help='If true, the output image of the testing phase will be saved') parser.add_argument('--results_dir', type=str,default='/results_dir/', help='If true, the output image of the testing phase will be saved') parser.add_argument('--save_test_output', type=bool, default=True, help='save the test output or not') parser.add_argument('--output_format', type=str, default='txt', metavar='PATH', help='the type of the test output (support image, video or none)') parser.add_argument('--output_video_fn', type=str, default='out_fpn_resnet_18', metavar='PATH', help='the video filename if the output format is video') parser.add_argument('--output-width', type=int, default=608, help='the width of showing output, the height maybe vary') configs = edict(vars(parser.parse_args())) configs.pin_memory = True configs.distributed = False # For testing on 1 GPU only configs.input_size = (1216, 608) configs.hm_size = (304, 152) configs.down_ratio = 4 configs.max_objects = 50 configs.imagenet_pretrained = False configs.head_conv = 64 configs.num_classes = 3 configs.num_center_offset = 2 configs.num_z = 1 configs.num_dim = 3 configs.num_direction = 2 # sin, cos configs.heads = { 'hm_cen': configs.num_classes, 'cen_offset': configs.num_center_offset, 'direction': configs.num_direction, 'z_coor': configs.num_z, 'dim': configs.num_dim } configs.num_input_features = 4 #################################################################### ##############Dataset, Checkpoints, and results dir configs######### #################################################################### configs.root_dir = '../' # configs.dataset_dir = os.path.join(configs.root_dir, 'dataset', 'apollo') # configs.results_dir_img = os.path.join(configs.results_dir, configs.saved_fn, 'image') # configs.results_dir_txt = os.path.join(configs.results_dir, configs.saved_fn, 'txt') # make_folder(configs.results_dir_img) # make_folder(configs.results_dir_txt) make_folder(configs.results_dir) return configs def _sigmoid(x): return torch.clamp(x.sigmoid_(), min=1e-4, max=1 - 1e-4) def time_synchronized(): torch.cuda.synchronize() if torch.cuda.is_available() else None return time.time() def make_folder(folder_name): if not os.path.exists(folder_name): os.makedirs(folder_name) def drawRotatedBox(img, x, y, w, l, yaw, color): bev_corners = get_corners(x, y, w, l, yaw) corners_int = bev_corners.reshape(-1, 1, 2).astype(int) cv2.polylines(img, [corners_int], True, color, 2) corners_int = bev_corners.reshape(-1, 2) cv2.line(img, (int(corners_int[0, 0]), int(corners_int[0, 1])), (int(corners_int[3, 0]), int(corners_int[3, 1])), (255, 255, 0), 2) # bev image coordinates format def get_corners(x, y, w, l, yaw): bev_corners = np.zeros((4, 2), dtype=np.float32) cos_yaw = np.cos(yaw) sin_yaw = np.sin(yaw) # front left bev_corners[0, 0] = x - w / 2 * cos_yaw - l / 2 * sin_yaw bev_corners[0, 1] = y - w / 2 * sin_yaw + l / 2 * cos_yaw # rear left bev_corners[1, 0] = x - w / 2 * cos_yaw + l / 2 * sin_yaw bev_corners[1, 1] = y - w / 2 * sin_yaw - l / 2 * cos_yaw # rear right bev_corners[2, 0] = x + w / 2 * cos_yaw + l / 2 * sin_yaw bev_corners[2, 1] = y + w / 2 * sin_yaw - l / 2 * cos_yaw # front right bev_corners[3, 0] = x + w / 2 * cos_yaw - l / 2 * sin_yaw bev_corners[3, 1] = y + w / 2 * sin_yaw + l / 2 * cos_yaw return bev_corners def _nms(heat, kernel=3): pad = (kernel - 1) // 2 hmax = F.max_pool2d(heat, (kernel, kernel), stride=1, padding=pad) keep = (hmax == heat).float() return heat * keep def _gather_feat(feat, ind, mask=None): dim = feat.size(2) ind = ind.unsqueeze(2).expand(ind.size(0), ind.size(1), dim) feat = feat.gather(1, ind) if mask is not None: mask = mask.unsqueeze(2).expand_as(feat) feat = feat[mask] feat = feat.view(-1, dim) return feat def _transpose_and_gather_feat(feat, ind): feat = feat.permute(0, 2, 3, 1).contiguous() feat = feat.view(feat.size(0), -1, feat.size(3)) feat = _gather_feat(feat, ind) return feat def _topk(scores, K=40): batch, cat, height, width = scores.size() topk_scores, topk_inds = torch.topk(scores.view(batch, cat, -1), K) topk_inds = topk_inds % (height * width) topk_ys = (torch.floor_divide(topk_inds, width)).float() topk_xs = (topk_inds % width).int().float() topk_score, topk_ind = torch.topk(topk_scores.view(batch, -1), K) topk_clses = (torch.floor_divide(topk_ind, K)).int() topk_inds = _gather_feat(topk_inds.view(batch, -1, 1), topk_ind).view(batch, K) topk_ys = _gather_feat(topk_ys.view(batch, -1, 1), topk_ind).view(batch, K) topk_xs = _gather_feat(topk_xs.view(batch, -1, 1), topk_ind).view(batch, K) return topk_score, topk_inds, topk_clses, topk_ys, topk_xs def decode(hm_cen, cen_offset, direction, z_coor, dim, K=40): batch_size, num_classes, height, width = hm_cen.size() hm_cen = _nms(hm_cen) scores, inds, clses, ys, xs = _topk(hm_cen, K=K) if cen_offset is not None: cen_offset = _transpose_and_gather_feat(cen_offset, inds) cen_offset = cen_offset.view(batch_size, K, 2) xs = xs.view(batch_size, K, 1) + cen_offset[:, :, 0:1] ys = ys.view(batch_size, K, 1) + cen_offset[:, :, 1:2] else: xs = xs.view(batch_size, K, 1) + 0.5 ys = ys.view(batch_size, K, 1) + 0.5 direction = _transpose_and_gather_feat(direction, inds) direction = direction.view(batch_size, K, 2) z_coor = _transpose_and_gather_feat(z_coor, inds) z_coor = z_coor.view(batch_size, K, 1) dim = _transpose_and_gather_feat(dim, inds) dim = dim.view(batch_size, K, 3) clses = clses.view(batch_size, K, 1).float() scores = scores.view(batch_size, K, 1) # (scores x 1, ys x 1, xs x 1, z_coor x 1, dim x 3, direction x 2, clses x 1) # (scores-0:1, ys-1:2, xs-2:3, z_coor-3:4, dim-4:7, direction-7:9, clses-9:10) # detections: [batch_size, K, 10] detections = torch.cat([scores, xs, ys, z_coor, dim, direction, clses], dim=2) return detections def get_yaw(direction): return np.arctan2(direction[:, 0:1], direction[:, 1:2]) def post_processing(detections, num_classes=3, down_ratio=4, peak_thresh=0.2): """ :param detections: [batch_size, K, 10] # (scores x 1, xs x 1, ys x 1, z_coor x 1, dim x 3, direction x 2, clses x 1) # (scores-0:1, xs-1:2, ys-2:3, z_coor-3:4, dim-4:7, direction-7:9, clses-9:10) :return: """ # TODO: Need to consider rescale to the original scale: x, y ret = [] for i in range(detections.shape[0]): top_preds = {} classes = detections[i, :, -1] for j in range(num_classes): inds = (classes == j) # x, y, z, h, w, l, yaw top_preds[j] = np.concatenate([ detections[i, inds, 0:1], detections[i, inds, 1:2] * down_ratio, detections[i, inds, 2:3] * down_ratio, detections[i, inds, 3:4], detections[i, inds, 4:5], detections[i, inds, 5:6] / cnf.bound_size_y * cnf.BEV_WIDTH, detections[i, inds, 6:7] / cnf.bound_size_x * cnf.BEV_HEIGHT, get_yaw(detections[i, inds, 7:9]).astype(np.float32)], axis=1) # Filter by peak_thresh if len(top_preds[j]) > 0: keep_inds = (top_preds[j][:, 0] > peak_thresh) top_preds[j] = top_preds[j][keep_inds] ret.append(top_preds) return ret def draw_predictions(img, detections, num_classes=3): for j in range(num_classes): if len(detections[j]) > 0: for det in detections[j]: # (scores-0:1, x-1:2, y-2:3, z-3:4, dim-4:7, yaw-7:8) _score, _x, _y, _z, _h, _w, _l, _yaw = det drawRotatedBox(img, _x, _y, _w, _l, _yaw, cnf.colors[int(j)]) return img def convert_det_to_real_values(detections, num_classes=3): kitti_dets = [] for cls_id in range(num_classes): if len(detections[cls_id]) > 0: for det in detections[cls_id]: # (scores-0:1, x-1:2, y-2:3, z-3:4, dim-4:7, yaw-7:8) _score, _x, _y, _z, _h, _w, _l, _yaw = det _yaw = round(-_yaw/1, 2) x = round(_y / cnf.BEV_HEIGHT * cnf.bound_size_x + cnf.boundary['minX'], 2) y = round(_x / cnf.BEV_WIDTH * cnf.bound_size_y + cnf.boundary['minY'], 2) z = round(_z + cnf.boundary['minZ'], 2) w = round(_w / cnf.BEV_WIDTH * cnf.bound_size_y, 2) l = round(_l / cnf.BEV_HEIGHT * cnf.bound_size_x, 2) h = round(_h/1, 2) kitti_dets.append([cls_id, h, w, l, x, y, z, _yaw]) return np.array(kitti_dets) if __name__ == '__main__': print("=".ljust(66, "=")) configs = parse_test_configs() model = create_model(configs) print('\n\n' + '-*=' * 30 + '\n\n') # assert os.path.isfile(configs.model_dir), "No file at {}".format(configs.model_dir) if os.path.isfile(configs.model_dir): model_path = configs.model_dir else: # for file in os.listdir(configs.model_dir): # model_path = os.path.join(configs.model_dir, file) # 取最后一个模型 model_path = os.path.join(configs.model_dir, os.listdir(configs.model_dir)[-1]) print('Loaded weights from {}\n'.format(model_path)) # model.load_state_dict(torch.load(model_path)) configs.device = torch.device('cpu' if configs.no_cuda else 'cuda:{}'.format(configs.gpu_idx)) model.load_state_dict(torch.load(model_path, map_location=configs.device)) model = model.to(device=configs.device) out_cap = None model.eval() test_dataloader = create_test_dataloader(configs) with torch.no_grad(): for batch_idx, batch_data in enumerate(test_dataloader): bev_maps, metadatas = batch_data input_bev_maps = bev_maps.to(configs.device, non_blocking=True).float() t1 = time_synchronized() outputs = model(input_bev_maps) outputs['hm_cen'] = _sigmoid(outputs['hm_cen']) outputs['cen_offset'] = _sigmoid(outputs['cen_offset']) # detections size (batch_size, K, 10) detections = decode(outputs['hm_cen'], outputs['cen_offset'], outputs['direction'], outputs['z_coor'], outputs['dim'], K=configs.K) detections = detections.cpu().numpy().astype(np.float32) detections = post_processing(detections, configs.num_classes, configs.down_ratio, configs.peak_thresh) t2 = time_synchronized() detections = detections[0] # only first batch # Draw prediction in the image bev_map = (bev_maps.squeeze().permute(1, 2, 0).numpy() * 255).astype(np.uint8) bev_map = cv2.resize(bev_map, (cnf.BEV_WIDTH, cnf.BEV_HEIGHT)) bev_map = draw_predictions(bev_map, detections.copy(), configs.num_classes) # Rotate the bev_map bev_map = cv2.rotate(bev_map, cv2.ROTATE_180) kitti_dets = convert_det_to_real_values(detections) print('\tDone testing the {}th sample, time: {:.1f}ms, speed {:.2f}FPS'.format(batch_idx, (t2 - t1) * 1000, 1 / (t2 - t1))) if configs.save_test_output: img_fn = os.path.basename(metadatas['bev_path'][0])[:-4] if configs.output_format == 'image': cv2.imwrite(os.path.join(configs.results_dir_img, '{}.jpg'.format(img_fn)), bev_map) elif configs.output_format == 'video': if out_cap is None: out_cap_h, out_cap_w = bev_map.shape[:2] fourcc = cv2.VideoWriter_fourcc(*'MJPG') out_cap = cv2.VideoWriter( os.path.join(configs.results_dir_img, '{}.avi'.format(configs.output_video_fn)), fourcc, 30, (out_cap_w, out_cap_h)) out_cap.write(bev_map) else: pass txt_path = os.path.join(configs.results_dir,'{}.txt'.format(img_fn)) txt_file = open(txt_path, 'w') for det in kitti_dets: write_line = cnf.CLASS_ID_TO_NAME[det[0]] + ' 0 0 0 0 0 0 0 ' + str(det[1]) + ' ' + str(det[2]) +\ ' ' + str(det[3]) + ' ' + str(det[4]) + ' ' + str(det[5]) + ' ' + str(det[6]) + ' ' + str(det[7]) +'\n' txt_file.writelines(write_line) txt_file.close() if out_cap: out_cap.release() cv2.destroyAllWindows()