You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

bulk_write.go 14 kB

4 years ago

  1. // Copyright (C) MongoDB, Inc. 2017-present.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
  6. package mongo
  7. import (
  8. "context"
  9. "go.mongodb.org/mongo-driver/bson/bsoncodec"
  10. "go.mongodb.org/mongo-driver/mongo/options"
  11. "go.mongodb.org/mongo-driver/mongo/writeconcern"
  12. "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
  13. "go.mongodb.org/mongo-driver/x/mongo/driver"
  14. "go.mongodb.org/mongo-driver/x/mongo/driver/description"
  15. "go.mongodb.org/mongo-driver/x/mongo/driver/operation"
  16. "go.mongodb.org/mongo-driver/x/mongo/driver/session"
  17. )
  18. type bulkWriteBatch struct {
  19. models []WriteModel
  20. canRetry bool
  21. }
  22. // bulkWrite perfoms a bulkwrite operation
  23. type bulkWrite struct {
  24. ordered *bool
  25. bypassDocumentValidation *bool
  26. models []WriteModel
  27. session *session.Client
  28. collection *Collection
  29. selector description.ServerSelector
  30. writeConcern *writeconcern.WriteConcern
  31. result BulkWriteResult
  32. }
  33. func (bw *bulkWrite) execute(ctx context.Context) error {
  34. ordered := true
  35. if bw.ordered != nil {
  36. ordered = *bw.ordered
  37. }
  38. batches := createBatches(bw.models, ordered)
  39. bw.result = BulkWriteResult{
  40. UpsertedIDs: make(map[int64]interface{}),
  41. }
  42. bwErr := BulkWriteException{
  43. WriteErrors: make([]BulkWriteError, 0),
  44. }
  45. var lastErr error
  46. var opIndex int64 // the operation index for the upsertedIDs map
  47. continueOnError := !ordered
  48. for _, batch := range batches {
  49. if len(batch.models) == 0 {
  50. continue
  51. }
  52. bypassDocValidation := bw.bypassDocumentValidation
  53. if bypassDocValidation != nil && !*bypassDocValidation {
  54. bypassDocValidation = nil
  55. }
  56. batchRes, batchErr, err := bw.runBatch(ctx, batch)
  57. bw.mergeResults(batchRes, opIndex)
  58. bwErr.WriteConcernError = batchErr.WriteConcernError
  59. for i := range batchErr.WriteErrors {
  60. batchErr.WriteErrors[i].Index = batchErr.WriteErrors[i].Index + int(opIndex)
  61. }
  62. bwErr.WriteErrors = append(bwErr.WriteErrors, batchErr.WriteErrors...)
  63. if !continueOnError && (err != nil || len(batchErr.WriteErrors) > 0 || batchErr.WriteConcernError != nil) {
  64. if err != nil {
  65. return err
  66. }
  67. return bwErr
  68. }
  69. if err != nil {
  70. lastErr = err
  71. }
  72. opIndex += int64(len(batch.models))
  73. }
  74. bw.result.MatchedCount -= bw.result.UpsertedCount
  75. if lastErr != nil {
  76. return lastErr
  77. }
  78. if len(bwErr.WriteErrors) > 0 || bwErr.WriteConcernError != nil {
  79. return bwErr
  80. }
  81. return nil
  82. }
  83. func (bw *bulkWrite) runBatch(ctx context.Context, batch bulkWriteBatch) (BulkWriteResult, BulkWriteException, error) {
  84. batchRes := BulkWriteResult{
  85. UpsertedIDs: make(map[int64]interface{}),
  86. }
  87. batchErr := BulkWriteException{}
  88. var writeErrors []driver.WriteError
  89. switch batch.models[0].(type) {
  90. case *InsertOneModel:
  91. res, err := bw.runInsert(ctx, batch)
  92. if err != nil {
  93. writeErr, ok := err.(driver.WriteCommandError)
  94. if !ok {
  95. return BulkWriteResult{}, batchErr, err
  96. }
  97. writeErrors = writeErr.WriteErrors
  98. batchErr.WriteConcernError = convertDriverWriteConcernError(writeErr.WriteConcernError)
  99. }
  100. batchRes.InsertedCount = int64(res.N)
  101. case *DeleteOneModel, *DeleteManyModel:
  102. res, err := bw.runDelete(ctx, batch)
  103. if err != nil {
  104. writeErr, ok := err.(driver.WriteCommandError)
  105. if !ok {
  106. return BulkWriteResult{}, batchErr, err
  107. }
  108. writeErrors = writeErr.WriteErrors
  109. batchErr.WriteConcernError = convertDriverWriteConcernError(writeErr.WriteConcernError)
  110. }
  111. batchRes.DeletedCount = int64(res.N)
  112. case *ReplaceOneModel, *UpdateOneModel, *UpdateManyModel:
  113. res, err := bw.runUpdate(ctx, batch)
  114. if err != nil {
  115. writeErr, ok := err.(driver.WriteCommandError)
  116. if !ok {
  117. return BulkWriteResult{}, batchErr, err
  118. }
  119. writeErrors = writeErr.WriteErrors
  120. batchErr.WriteConcernError = convertDriverWriteConcernError(writeErr.WriteConcernError)
  121. }
  122. batchRes.MatchedCount = int64(res.N)
  123. batchRes.ModifiedCount = int64(res.NModified)
  124. batchRes.UpsertedCount = int64(len(res.Upserted))
  125. for _, upsert := range res.Upserted {
  126. batchRes.UpsertedIDs[upsert.Index] = upsert.ID
  127. }
  128. }
  129. batchErr.WriteErrors = make([]BulkWriteError, 0, len(writeErrors))
  130. convWriteErrors := writeErrorsFromDriverWriteErrors(writeErrors)
  131. for _, we := range convWriteErrors {
  132. batchErr.WriteErrors = append(batchErr.WriteErrors, BulkWriteError{
  133. WriteError: we,
  134. Request: batch.models[0],
  135. })
  136. }
  137. return batchRes, batchErr, nil
  138. }
  139. func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (operation.InsertResult, error) {
  140. docs := make([]bsoncore.Document, len(batch.models))
  141. var i int
  142. for _, model := range batch.models {
  143. converted := model.(*InsertOneModel)
  144. doc, _, err := transformAndEnsureIDv2(bw.collection.registry, converted.Document)
  145. if err != nil {
  146. return operation.InsertResult{}, err
  147. }
  148. docs[i] = doc
  149. i++
  150. }
  151. op := operation.NewInsert(docs...).
  152. Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.collection.client.monitor).
  153. ServerSelector(bw.selector).ClusterClock(bw.collection.client.clock).
  154. Database(bw.collection.db.name).Collection(bw.collection.name).
  155. Deployment(bw.collection.client.topology)
  156. if bw.bypassDocumentValidation != nil && *bw.bypassDocumentValidation {
  157. op = op.BypassDocumentValidation(*bw.bypassDocumentValidation)
  158. }
  159. if bw.ordered != nil {
  160. op = op.Ordered(*bw.ordered)
  161. }
  162. retry := driver.RetryNone
  163. if bw.collection.client.retryWrites && batch.canRetry {
  164. retry = driver.RetryOncePerCommand
  165. }
  166. op = op.Retry(retry)
  167. err := op.Execute(ctx)
  168. return op.Result(), err
  169. }
  170. func (bw *bulkWrite) runDelete(ctx context.Context, batch bulkWriteBatch) (operation.DeleteResult, error) {
  171. docs := make([]bsoncore.Document, len(batch.models))
  172. var i int
  173. for _, model := range batch.models {
  174. var doc bsoncore.Document
  175. var err error
  176. switch converted := model.(type) {
  177. case *DeleteOneModel:
  178. doc, err = createDeleteDoc(converted.Filter, converted.Collation, true, bw.collection.registry)
  179. case *DeleteManyModel:
  180. doc, err = createDeleteDoc(converted.Filter, converted.Collation, false, bw.collection.registry)
  181. }
  182. if err != nil {
  183. return operation.DeleteResult{}, err
  184. }
  185. docs[i] = doc
  186. i++
  187. }
  188. op := operation.NewDelete(docs...).
  189. Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.collection.client.monitor).
  190. ServerSelector(bw.selector).ClusterClock(bw.collection.client.clock).
  191. Database(bw.collection.db.name).Collection(bw.collection.name).
  192. Deployment(bw.collection.client.topology)
  193. if bw.ordered != nil {
  194. op = op.Ordered(*bw.ordered)
  195. }
  196. retry := driver.RetryNone
  197. if bw.collection.client.retryWrites && batch.canRetry {
  198. retry = driver.RetryOncePerCommand
  199. }
  200. op = op.Retry(retry)
  201. err := op.Execute(ctx)
  202. return op.Result(), err
  203. }
  204. func createDeleteDoc(filter interface{}, collation *options.Collation, deleteOne bool, registry *bsoncodec.Registry) (bsoncore.Document, error) {
  205. f, err := transformBsoncoreDocument(registry, filter)
  206. if err != nil {
  207. return nil, err
  208. }
  209. var limit int32
  210. if deleteOne {
  211. limit = 1
  212. }
  213. didx, doc := bsoncore.AppendDocumentStart(nil)
  214. doc = bsoncore.AppendDocumentElement(doc, "q", f)
  215. doc = bsoncore.AppendInt32Element(doc, "limit", limit)
  216. if collation != nil {
  217. doc = bsoncore.AppendDocumentElement(doc, "collation", collation.ToDocument())
  218. }
  219. doc, _ = bsoncore.AppendDocumentEnd(doc, didx)
  220. return doc, nil
  221. }
  222. func (bw *bulkWrite) runUpdate(ctx context.Context, batch bulkWriteBatch) (operation.UpdateResult, error) {
  223. docs := make([]bsoncore.Document, len(batch.models))
  224. for i, model := range batch.models {
  225. var doc bsoncore.Document
  226. var err error
  227. switch converted := model.(type) {
  228. case *ReplaceOneModel:
  229. doc, err = createUpdateDoc(converted.Filter, converted.Replacement, nil, converted.Collation, converted.Upsert, false,
  230. bw.collection.registry)
  231. case *UpdateOneModel:
  232. doc, err = createUpdateDoc(converted.Filter, converted.Update, converted.ArrayFilters, converted.Collation, converted.Upsert, false,
  233. bw.collection.registry)
  234. case *UpdateManyModel:
  235. doc, err = createUpdateDoc(converted.Filter, converted.Update, converted.ArrayFilters, converted.Collation, converted.Upsert, true,
  236. bw.collection.registry)
  237. }
  238. if err != nil {
  239. return operation.UpdateResult{}, err
  240. }
  241. docs[i] = doc
  242. }
  243. op := operation.NewUpdate(docs...).
  244. Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.collection.client.monitor).
  245. ServerSelector(bw.selector).ClusterClock(bw.collection.client.clock).
  246. Database(bw.collection.db.name).Collection(bw.collection.name).
  247. Deployment(bw.collection.client.topology)
  248. if bw.ordered != nil {
  249. op = op.Ordered(*bw.ordered)
  250. }
  251. if bw.bypassDocumentValidation != nil && *bw.bypassDocumentValidation {
  252. op = op.BypassDocumentValidation(*bw.bypassDocumentValidation)
  253. }
  254. retry := driver.RetryNone
  255. if bw.collection.client.retryWrites && batch.canRetry {
  256. retry = driver.RetryOncePerCommand
  257. }
  258. op = op.Retry(retry)
  259. err := op.Execute(ctx)
  260. return op.Result(), err
  261. }
  262. func createUpdateDoc(
  263. filter interface{},
  264. update interface{},
  265. arrayFilters *options.ArrayFilters,
  266. collation *options.Collation,
  267. upsert *bool,
  268. multi bool,
  269. registry *bsoncodec.Registry,
  270. ) (bsoncore.Document, error) {
  271. f, err := transformBsoncoreDocument(registry, filter)
  272. if err != nil {
  273. return nil, err
  274. }
  275. uidx, updateDoc := bsoncore.AppendDocumentStart(nil)
  276. updateDoc = bsoncore.AppendDocumentElement(updateDoc, "q", f)
  277. u, err := transformUpdateValue(registry, update, false)
  278. if err != nil {
  279. return nil, err
  280. }
  281. updateDoc = bsoncore.AppendValueElement(updateDoc, "u", u)
  282. updateDoc = bsoncore.AppendBooleanElement(updateDoc, "multi", multi)
  283. if arrayFilters != nil {
  284. arr, err := arrayFilters.ToArrayDocument()
  285. if err != nil {
  286. return nil, err
  287. }
  288. updateDoc = bsoncore.AppendArrayElement(updateDoc, "arrayFilters", arr)
  289. }
  290. if collation != nil {
  291. updateDoc = bsoncore.AppendDocumentElement(updateDoc, "collation", bsoncore.Document(collation.ToDocument()))
  292. }
  293. if upsert != nil {
  294. updateDoc = bsoncore.AppendBooleanElement(updateDoc, "upsert", *upsert)
  295. }
  296. updateDoc, _ = bsoncore.AppendDocumentEnd(updateDoc, uidx)
  297. return updateDoc, nil
  298. }
  299. func createBatches(models []WriteModel, ordered bool) []bulkWriteBatch {
  300. if ordered {
  301. return createOrderedBatches(models)
  302. }
  303. batches := make([]bulkWriteBatch, 5)
  304. batches[insertCommand].canRetry = true
  305. batches[deleteOneCommand].canRetry = true
  306. batches[updateOneCommand].canRetry = true
  307. // TODO(GODRIVER-1157): fix batching once operation retryability is fixed
  308. for _, model := range models {
  309. switch model.(type) {
  310. case *InsertOneModel:
  311. batches[insertCommand].models = append(batches[insertCommand].models, model)
  312. case *DeleteOneModel:
  313. batches[deleteOneCommand].models = append(batches[deleteOneCommand].models, model)
  314. case *DeleteManyModel:
  315. batches[deleteManyCommand].models = append(batches[deleteManyCommand].models, model)
  316. case *ReplaceOneModel, *UpdateOneModel:
  317. batches[updateOneCommand].models = append(batches[updateOneCommand].models, model)
  318. case *UpdateManyModel:
  319. batches[updateManyCommand].models = append(batches[updateManyCommand].models, model)
  320. }
  321. }
  322. return batches
  323. }
  324. func createOrderedBatches(models []WriteModel) []bulkWriteBatch {
  325. var batches []bulkWriteBatch
  326. var prevKind writeCommandKind = -1
  327. i := -1 // batch index
  328. for _, model := range models {
  329. var createNewBatch bool
  330. var canRetry bool
  331. var newKind writeCommandKind
  332. // TODO(GODRIVER-1157): fix batching once operation retryability is fixed
  333. switch model.(type) {
  334. case *InsertOneModel:
  335. createNewBatch = prevKind != insertCommand
  336. canRetry = true
  337. newKind = insertCommand
  338. case *DeleteOneModel:
  339. createNewBatch = prevKind != deleteOneCommand
  340. canRetry = true
  341. newKind = deleteOneCommand
  342. case *DeleteManyModel:
  343. createNewBatch = prevKind != deleteManyCommand
  344. newKind = deleteManyCommand
  345. case *ReplaceOneModel, *UpdateOneModel:
  346. createNewBatch = prevKind != updateOneCommand
  347. canRetry = true
  348. newKind = updateOneCommand
  349. case *UpdateManyModel:
  350. createNewBatch = prevKind != updateManyCommand
  351. newKind = updateManyCommand
  352. }
  353. if createNewBatch {
  354. batches = append(batches, bulkWriteBatch{
  355. models: []WriteModel{model},
  356. canRetry: canRetry,
  357. })
  358. i++
  359. } else {
  360. batches[i].models = append(batches[i].models, model)
  361. if !canRetry {
  362. batches[i].canRetry = false // don't make it true if it was already false
  363. }
  364. }
  365. prevKind = newKind
  366. }
  367. return batches
  368. }
  369. func (bw *bulkWrite) mergeResults(newResult BulkWriteResult, opIndex int64) {
  370. bw.result.InsertedCount += newResult.InsertedCount
  371. bw.result.MatchedCount += newResult.MatchedCount
  372. bw.result.ModifiedCount += newResult.ModifiedCount
  373. bw.result.DeletedCount += newResult.DeletedCount
  374. bw.result.UpsertedCount += newResult.UpsertedCount
  375. for index, upsertID := range newResult.UpsertedIDs {
  376. bw.result.UpsertedIDs[index+opIndex] = upsertID
  377. }
  378. }
  379. // WriteCommandKind is the type of command represented by a Write
  380. type writeCommandKind int8
  381. // These constants represent the valid types of write commands.
  382. const (
  383. insertCommand writeCommandKind = iota
  384. updateOneCommand
  385. updateManyCommand
  386. deleteOneCommand
  387. deleteManyCommand
  388. )