You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

dataloader.py 14 kB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. from nvidia.dali.pipeline import Pipeline
  2. from nvidia.dali import ops
  3. from nvidia.dali import types
  4. from nvidia.dali.plugin.pytorch import DALIClassificationIterator
  5. import numpy as np
  6. import torch
  7. from torch import nn
  8. class HybridTrainPipeline(Pipeline):
  9. def __init__(self, batch_size, file_root, num_threads, device_id, num_shards, shard_id):
  10. super(HybridTrainPipeline, self).__init__(batch_size, num_threads, device_id)
  11. device_type = {0:"cpu"}
  12. if num_shards == 0:
  13. self.input = ops.FileReader(file_root = file_root)
  14. else:
  15. self.input = ops.FileReader(file_root = file_root, num_shards = num_shards, shard_id = shard_id)
  16. # ##### 可自由更改 ###################################
  17. self.decode = ops.ImageDecoder(device = device_type.get(num_shards, "mixed"), output_type = types.RGB)
  18. self.res = ops.RandomResizedCrop(device=device_type.get(num_shards, "gpu"), size = 224)
  19. self.cmnp = ops.CropMirrorNormalize(device=device_type.get(num_shards, "gpu"),
  20. dtype = types.FLOAT, # output_dtype=types.FLOAT,
  21. output_layout=types.NCHW,
  22. mean=0. ,# if spos_pre else [0.485 * 255, 0.456 * 255, 0.406 * 255],
  23. std=1. )# if spos_pre else [0.229 * 255, 0.224 * 255, 0.225 * 255])
  24. # ####################################################
  25. def define_graph(self, ):
  26. jpegs, labels = self.input(name="Reader")
  27. images = self.decode(jpegs)
  28. images = self.res(images)
  29. images = self.cmnp(images)
  30. return images, labels
  31. class HybridValPipeline(Pipeline):
  32. def __init__(self, batch_size, file_root, num_threads, device_id, num_shards, shard_id):
  33. super(HybridValPipeline, self).__init__(batch_size, num_threads, device_id)
  34. device_type = {0:"cpu"}
  35. if num_shards == 0:
  36. self.input = ops.FileReader(file_root = file_root)
  37. else:
  38. self.input = ops.FileReader(file_root = file_root, num_shards = num_shards, shard_id = shard_id)
  39. # ##### 可自由更改 ###################################
  40. self.decode = ops.ImageDecoder(device = device_type.get(num_shards, "mixed"), output_type = types.RGB)
  41. self.res = ops.RandomResizedCrop(device=device_type.get(num_shards, "gpu"), size = 224)
  42. self.cmnp = ops.CropMirrorNormalize(device=device_type.get(num_shards, "gpu"),
  43. dtype = types.FLOAT, # output_dtype=types.FLOAT,
  44. output_layout=types.NCHW,
  45. mean=0. ,# if spos_pre else [0.485 * 255, 0.456 * 255, 0.406 * 255],
  46. std=1. )# if spos_pre else [0.229 * 255, 0.224 * 255, 0.225 * 255])
  47. # ####################################################
  48. def define_graph(self, ):
  49. jpegs, labels = self.input(name="Reader")
  50. images = self.decode(jpegs)
  51. images = self.res(images)
  52. images = self.cmnp(images)
  53. return images, labels
  54. class TorchWrapper:
  55. """
  56. 将多个pipeline封装为一个iterator
  57. parameters:
  58. num_shards : int 显卡并行数
  59. data_loader : dali.pipeline.Pipeline类型 经过pipeline处理的数据结果
  60. iter_mode : str recursion, iter 指定多个pipeline合并的方式,默认recursion
  61. """
  62. def __init__(self, num_shards, data_loader, iter_mode = "recursion"):
  63. self.index = 0
  64. self.count = 0
  65. self.num_shards = num_shards
  66. self.data_loader = data_loader
  67. self.iter_mode = iter_mode
  68. if self.iter_mode not in {"recursion", "iter"}:
  69. raise Exception("iter_mode should be either 'recursion' or 'iter'")
  70. def __iter__(self,):
  71. return self
  72. def __len__(self, ):
  73. # 返回样本总量,而非batch_num
  74. if num_shards == 0:
  75. return self.data_loader.size
  76. else:
  77. return len(self.data_loader)*self.data_loader[0].size
  78. def __next__(self, ):
  79. if num_shards == 0:
  80. # 不使用GPU
  81. data = next(self.data_loader)
  82. return data[0]["data"], data[0]["label"].view(-1).long()
  83. else:
  84. # 使用一块或多块GPU
  85. if self.iter_mode == "recursion":
  86. return self._get_next_recursion()
  87. elif self.iter_mode == "iter":
  88. return self._get_next_iter(self.data_loader[0])
  89. def _get_next_iter(self, data_loader):
  90. if self.count == data_loader.size:
  91. self.index+=1
  92. data_loader = self.data_loader[self.index]
  93. self.count+=1
  94. data = next(data_loader)
  95. return data[0]["data"], data[0]["label"].view(-1).long()
  96. def _get_next_recursion(self, ):
  97. self.index = self.count%self.num_shards
  98. self.count+=1
  99. data_loader = self.data_loader[self.index]
  100. data = next(data_loader)
  101. return data[0]["data"], data[0]["label"].view(-1).long()
  102. def get_iter_dali_cuda(batch_size=256, train_file_root="", val_file_root="", num_threads=4, device_id=[-1], num_shards=0, shard_id=[-1]):
  103. """
  104. 获取可用于pytorch训练的数据迭代器
  105. 数据的读取和处理部分可以使用多张GPU来完成
  106. 1、创建dali pipeline
  107. 2、封装为适用于pytorch的数据迭代器
  108. 3、将多卡的各个pipeline封装在一起
  109. 4、数据输出在cpu端,在cuda中
  110. 数据需要保证如下形式:
  111. images
  112. |-file_list.txt
  113. |-images/dog
  114. |-dog_4.jpg
  115. |-dog_5.jpg
  116. |-dog_9.jpg
  117. |-dog_6.jpg
  118. |-dog_3.jpg
  119. |-images/kitten
  120. |-cat_10.jpg
  121. |-cat_5.jpg
  122. |-cat_9.jpg
  123. |-cat_8.jpg
  124. |-cat_1.jpg
  125. parameters:
  126. batch_size : int 每批数据的量
  127. file_root : str 数据的路径
  128. num_threads : int 读取数据的CPU线程数
  129. device_id : list of int GPU的物理编号
  130. shard_id : list of int GPU的虚拟编号
  131. num_shard : int
  132. methods:
  133. get_train_pipeline(shard_id, device_id) : 创建dali的pipeline,用以读取并处理训练数据
  134. get_val_pipeline(shard_id, device_id) : 创建dali的pipeline,用以读取并处理验证数据
  135. get_dali_iter_for_torch(piplines, data_num) : 封装成可用于pytorch的数据迭代器
  136. get_data_size(pipeline) : 计算每个pipeline实际输出的数据总量,数据总量是文件中的数据量,实际输出是去掉了不满一个批次大小的数据
  137. 例:
  138. # 分别从TRAIN_PATH和VAL_PATH读取训练和验证数据,batch_size选择256,启动4个线程来读取数据,用2块GPU处理数据,分别是第0号和第4号GPU
  139. # 程序默认使用所有显卡,和4线程
  140. # 如果使用单张GPU,请设置num_shards = 1, shard_id = [0], device_id保持一个列表形式
  141. # 如果不使用GPU,请使用get_iter_dali_cpu()
  142. train_data_iter, val_data_iter = get_iter_dali(batch_size=256,
  143. train_file_root=TRAIN_PATH,
  144. val_file_root=Val_PATH,
  145. num_threads=4,
  146. device_id=[0,4],
  147. num_shards=2,
  148. shard_id=[0,1])
  149. # 在torch中训练
  150. torch_model = TorchModel(para)
  151. criterion = nn.CrossEntropyLoss()
  152. optimizer = torch.optim.Adam(torch_model.parameters())
  153. for epoch in range(epoches):
  154. for step, x,y in enumerate(train_data_iter):
  155. # 数据 : x
  156. # 标签 : y
  157. x = x.to("cuda:0")
  158. y = y.to("cuda:0")
  159. output = my_model(x)
  160. optimizer.zero_grad()
  161. loss = criterion(output, y)
  162. loss.backward()
  163. optimizer.step()
  164. ...
  165. ...
  166. """
  167. def get_train_pipeline(shard_id, device_id):
  168. pipeline = HybridTrainPipeline(batch_size = batch_size,
  169. file_root = train_file_root,
  170. num_threads = num_threads,
  171. num_shards = num_shards,
  172. shard_id = shard_id,
  173. device_id = device_id)
  174. return pipeline
  175. def get_val_pipeline(shard_id, device_id):
  176. pipeline = HybridValPipeline(batch_size = batch_size,
  177. file_root = val_file_root,
  178. num_threads = num_threads,
  179. num_shards = num_shards,
  180. shard_id = shard_id,
  181. device_id = device_id)
  182. return pipeline
  183. pipeline_for_train = [get_train_pipeline(shard_id = shard_id_index, device_id = device_id_index) \
  184. for shard_id_index, device_id_index in zip(shard_id, device_id)]
  185. pipeline_for_val = [get_val_pipeline(shard_id = shard_id_index, device_id = device_id_index) \
  186. for shard_id_index, device_id_index in zip(shard_id, device_id)]
  187. [pipeline.build() for pipeline in pipeline_for_train]
  188. [pipeline.build() for pipeline in pipeline_for_val]
  189. def get_data_size(pipeline):
  190. data_num = pipeline.epoch_size()["Reader"]
  191. batch_size = pipeline.batch_size
  192. return data_num//batch_size*batch_size
  193. data_num_train = get_data_size(pipeline_for_train[0])
  194. data_num_val = get_data_size(pipeline_for_val[0])
  195. def get_dali_iter_for_torch(pipelines, data_num):
  196. return [DALIClassificationIterator(pipelines=pipeline,
  197. last_batch_policy="drop",size = data_num) for pipeline in pipelines]
  198. data_loader_train = get_dali_iter_for_torch(pipeline_for_train, data_num_train)
  199. data_loader_val = get_dali_iter_for_torch(pipeline_for_val, data_num_val)
  200. train_data_iter = TorchWrapper(num_shards, data_loader_train)
  201. val_data_iter = TorchWrapper(num_shards, data_loader_val)
  202. return train_data_iter, val_data_iter
  203. def get_iter_dali_cpu(batch_size=256, train_file_root="", val_file_root="", num_threads=4):
  204. pipeline_train = HybridTrainPipeline(batch_size = batch_size,
  205. file_root = train_file_root,
  206. num_threads = num_threads,
  207. num_shards = 0,
  208. shard_id = -1,
  209. device_id = 0)
  210. pipeline_val = HybridTrainPipeline(batch_size = batch_size,
  211. file_root = val_file_root,
  212. num_threads = num_threads,
  213. num_shards = 0,
  214. shard_id = -1,
  215. device_id = 0)
  216. pipeline_train.build()
  217. pipeline_val.build()
  218. def get_data_size(pipeline):
  219. data_num = pipeline.epoch_size()["Reader"]
  220. batch_size = pipeline.batch_size
  221. return data_num//batch_size*batch_size
  222. data_num_train = get_data_size(pipeline_train)
  223. data_num_val = get_data_size(pipeline_val)
  224. data_loader_train = DALIClassificationIterator(pipelines=pipeline_train,
  225. last_batch_policy="drop",size = data_num_train)
  226. data_loader_val = DALIClassificationIterator(pipelines=pipeline_val,
  227. last_batch_policy="drop",size = data_num_val)
  228. train_data_iter = TorchWrapper(0,data_loader_train)
  229. val_data_iter = TorchWrapper(0,data_loader_val)
  230. return train_data_iter, val_data_iter
  231. if __name__ == "__main__":
  232. PATH = "./imagenet"
  233. TRAIN_PATH = "./imagenet/train"
  234. VALID_PATH = "./imagenet/val"
  235. train_data_iter_cuda, val_data_iter_cuda = get_iter_dali_cuda(batch_size=256,
  236. train_file_root=TRAIN_PATH,
  237. val_file_root=TRAIN_PATH,
  238. num_threads=4,
  239. device_id=[0,4],
  240. num_shards=2,
  241. shard_id=[0,1])
  242. train_data_iter_cpu, val_data_iter_cpu = get_iter_dali_cpu(batch_size=256,
  243. train_file_root=TRAIN_PATH,
  244. val_file_root=TRAIN_PATH,
  245. num_threads=4)

一站式算法开发平台、高性能分布式深度学习框架、先进算法模型库、视觉模型炼知平台、数据可视化分析平台等一系列平台及工具,在模型高效分布式训练、数据处理和可视分析、模型炼知和轻量化等技术上形成独特优势,目前已在产学研等各领域近千家单位及个人提供AI应用赋能