You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

change_stream.go 15 kB

4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. // Copyright (C) MongoDB, Inc. 2017-present.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. // not use this file except in compliance with the License. You may obtain
  5. // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
  6. package mongo
  7. import (
  8. "context"
  9. "errors"
  10. "fmt"
  11. "reflect"
  12. "strconv"
  13. "time"
  14. "go.mongodb.org/mongo-driver/bson"
  15. "go.mongodb.org/mongo-driver/bson/bsoncodec"
  16. "go.mongodb.org/mongo-driver/bson/primitive"
  17. "go.mongodb.org/mongo-driver/mongo/options"
  18. "go.mongodb.org/mongo-driver/mongo/readconcern"
  19. "go.mongodb.org/mongo-driver/mongo/readpref"
  20. "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
  21. "go.mongodb.org/mongo-driver/x/mongo/driver"
  22. "go.mongodb.org/mongo-driver/x/mongo/driver/description"
  23. "go.mongodb.org/mongo-driver/x/mongo/driver/operation"
  24. "go.mongodb.org/mongo-driver/x/mongo/driver/session"
  25. )
  26. const errorInterrupted int32 = 11601
  27. const errorCappedPositionLost int32 = 136
  28. const errorCursorKilled int32 = 237
  29. // ErrMissingResumeToken indicates that a change stream notification from the server did not
  30. // contain a resume token.
  31. var ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
  32. // ErrNilCursor indicates that the cursor for the change stream is nil.
  33. var ErrNilCursor = errors.New("cursor is nil")
  34. // ChangeStream instances iterate a stream of change documents. Each document can be decoded via the
  35. // Decode method. Resume tokens should be retrieved via the ResumeToken method and can be stored to
  36. // resume the change stream at a specific point in time.
  37. //
  38. // A typical usage of the ChangeStream type would be:
  39. type ChangeStream struct {
  40. Current bson.Raw
  41. aggregate *operation.Aggregate
  42. pipelineSlice []bsoncore.Document
  43. cursor changeStreamCursor
  44. cursorOptions driver.CursorOptions
  45. batch []bsoncore.Document
  46. resumeToken bson.Raw
  47. err error
  48. sess *session.Client
  49. client *Client
  50. registry *bsoncodec.Registry
  51. streamType StreamType
  52. options *options.ChangeStreamOptions
  53. selector description.ServerSelector
  54. operationTime *primitive.Timestamp
  55. }
  56. type changeStreamConfig struct {
  57. readConcern *readconcern.ReadConcern
  58. readPreference *readpref.ReadPref
  59. client *Client
  60. registry *bsoncodec.Registry
  61. streamType StreamType
  62. collectionName string
  63. databaseName string
  64. }
  65. func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
  66. opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
  67. if ctx == nil {
  68. ctx = context.Background()
  69. }
  70. cs := &ChangeStream{
  71. client: config.client,
  72. registry: config.registry,
  73. streamType: config.streamType,
  74. options: options.MergeChangeStreamOptions(opts...),
  75. selector: description.ReadPrefSelector(config.readPreference),
  76. }
  77. cs.sess = sessionFromContext(ctx)
  78. if cs.sess == nil && cs.client.topology.SessionPool != nil {
  79. cs.sess, cs.err = session.NewClientSession(cs.client.topology.SessionPool, cs.client.id, session.Implicit)
  80. if cs.err != nil {
  81. return nil, cs.Err()
  82. }
  83. }
  84. if cs.err = cs.client.validSession(cs.sess); cs.err != nil {
  85. closeImplicitSession(cs.sess)
  86. return nil, cs.Err()
  87. }
  88. cs.aggregate = operation.NewAggregate(nil).
  89. ReadPreference(config.readPreference).ReadConcern(config.readConcern).
  90. Deployment(cs.client.topology).ClusterClock(cs.client.clock).
  91. CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone)
  92. if cs.options.Collation != nil {
  93. cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument()))
  94. }
  95. if cs.options.BatchSize != nil {
  96. cs.aggregate.BatchSize(*cs.options.BatchSize)
  97. cs.cursorOptions.BatchSize = *cs.options.BatchSize
  98. }
  99. if cs.options.MaxAwaitTime != nil {
  100. cs.cursorOptions.MaxTimeMS = int64(time.Duration(*cs.options.MaxAwaitTime) / time.Millisecond)
  101. }
  102. cs.cursorOptions.CommandMonitor = cs.client.monitor
  103. switch cs.streamType {
  104. case ClientStream:
  105. cs.aggregate.Database("admin")
  106. case DatabaseStream:
  107. cs.aggregate.Database(config.databaseName)
  108. case CollectionStream:
  109. cs.aggregate.Collection(config.collectionName).Database(config.databaseName)
  110. default:
  111. closeImplicitSession(cs.sess)
  112. return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType)
  113. }
  114. // When starting a change stream, cache startAfter as the first resume token if it is set. If not, cache
  115. // resumeAfter. If neither is set, do not cache a resume token.
  116. resumeToken := cs.options.StartAfter
  117. if resumeToken == nil {
  118. resumeToken = cs.options.ResumeAfter
  119. }
  120. var marshaledToken bson.Raw
  121. if resumeToken != nil {
  122. if marshaledToken, cs.err = bson.Marshal(resumeToken); cs.err != nil {
  123. closeImplicitSession(cs.sess)
  124. return nil, cs.Err()
  125. }
  126. }
  127. cs.resumeToken = marshaledToken
  128. if cs.err = cs.buildPipelineSlice(pipeline); cs.err != nil {
  129. closeImplicitSession(cs.sess)
  130. return nil, cs.Err()
  131. }
  132. var pipelineArr bsoncore.Document
  133. pipelineArr, cs.err = cs.pipelineToBSON()
  134. cs.aggregate.Pipeline(pipelineArr)
  135. if cs.err = cs.executeOperation(ctx, false); cs.err != nil {
  136. closeImplicitSession(cs.sess)
  137. return nil, cs.Err()
  138. }
  139. return cs, cs.Err()
  140. }
  141. func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
  142. var server driver.Server
  143. var conn driver.Connection
  144. var err error
  145. if server, cs.err = cs.client.topology.SelectServer(ctx, cs.selector); cs.err != nil {
  146. return cs.Err()
  147. }
  148. if conn, cs.err = server.Connection(ctx); cs.err != nil {
  149. return cs.Err()
  150. }
  151. defer conn.Close()
  152. cs.aggregate.Deployment(driver.SingleConnectionDeployment{
  153. C: conn,
  154. })
  155. if resuming {
  156. cs.replaceOptions(ctx, conn.Description().WireVersion) // pass wire version
  157. csOptDoc := cs.createPipelineOptionsDoc()
  158. pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil)
  159. pipDoc = bsoncore.AppendDocumentElement(pipDoc, "$changeStream", csOptDoc)
  160. if pipDoc, cs.err = bsoncore.AppendDocumentEnd(pipDoc, pipIdx); cs.err != nil {
  161. return cs.Err()
  162. }
  163. cs.pipelineSlice[0] = pipDoc
  164. var plArr bsoncore.Document
  165. if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil {
  166. return cs.Err()
  167. }
  168. cs.aggregate.Pipeline(plArr)
  169. }
  170. if original := cs.aggregate.Execute(ctx); original != nil {
  171. wireVersion := conn.Description().WireVersion
  172. retryableRead := cs.client.retryReads && wireVersion != nil && wireVersion.Max >= 6
  173. if !retryableRead {
  174. cs.err = replaceErrors(original)
  175. return cs.err
  176. }
  177. cs.err = original
  178. switch tt := original.(type) {
  179. case driver.Error:
  180. if !tt.Retryable() {
  181. break
  182. }
  183. server, err = cs.client.topology.SelectServer(ctx, cs.selector)
  184. if err != nil {
  185. break
  186. }
  187. conn.Close()
  188. conn, err = server.Connection(ctx)
  189. defer conn.Close()
  190. if err != nil {
  191. break
  192. }
  193. wireVersion := conn.Description().WireVersion
  194. if wireVersion == nil || wireVersion.Max < 6 {
  195. break
  196. }
  197. cs.aggregate.Deployment(driver.SingleConnectionDeployment{
  198. C: conn,
  199. })
  200. cs.err = cs.aggregate.Execute(ctx)
  201. }
  202. if cs.err != nil {
  203. cs.err = replaceErrors(cs.err)
  204. return cs.Err()
  205. }
  206. }
  207. cs.err = nil
  208. cr := cs.aggregate.ResultCursorResponse()
  209. cr.Server = server
  210. cs.cursor, cs.err = driver.NewBatchCursor(cr, cs.sess, cs.client.clock, cs.cursorOptions)
  211. if cs.err = replaceErrors(cs.err); cs.err != nil {
  212. return cs.Err()
  213. }
  214. cs.updatePbrtFromCommand()
  215. if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil &&
  216. cs.options.StartAfter == nil && conn.Description().WireVersion.Max >= 7 &&
  217. cs.emptyBatch() && cs.resumeToken == nil {
  218. cs.operationTime = cs.sess.OperationTime
  219. }
  220. return cs.Err()
  221. }
  222. // Updates the post batch resume token after a successful aggregate or getMore operation.
  223. func (cs *ChangeStream) updatePbrtFromCommand() {
  224. // Only cache the pbrt if an empty batch was returned and a pbrt was included
  225. if pbrt := cs.cursor.PostBatchResumeToken(); cs.emptyBatch() && pbrt != nil {
  226. cs.resumeToken = bson.Raw(pbrt)
  227. }
  228. }
  229. func (cs *ChangeStream) storeResumeToken() error {
  230. // If cs.Current is the last document in the batch and a pbrt is included, cache the pbrt
  231. // Otherwise, cache the _id of the document
  232. var tokenDoc bson.Raw
  233. if len(cs.batch) == 0 {
  234. if pbrt := cs.cursor.PostBatchResumeToken(); pbrt != nil {
  235. tokenDoc = bson.Raw(pbrt)
  236. }
  237. }
  238. if tokenDoc == nil {
  239. var ok bool
  240. tokenDoc, ok = cs.Current.Lookup("_id").DocumentOK()
  241. if !ok {
  242. _ = cs.Close(context.Background())
  243. return ErrMissingResumeToken
  244. }
  245. }
  246. cs.resumeToken = tokenDoc
  247. return nil
  248. }
  249. func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error {
  250. val := reflect.ValueOf(pipeline)
  251. if !val.IsValid() || !(val.Kind() == reflect.Slice) {
  252. cs.err = errors.New("can only transform slices and arrays into aggregation pipelines, but got invalid")
  253. return cs.err
  254. }
  255. cs.pipelineSlice = make([]bsoncore.Document, 0, val.Len()+1)
  256. csIdx, csDoc := bsoncore.AppendDocumentStart(nil)
  257. csDocTemp := cs.createPipelineOptionsDoc()
  258. if cs.err != nil {
  259. return cs.err
  260. }
  261. csDoc = bsoncore.AppendDocumentElement(csDoc, "$changeStream", csDocTemp)
  262. csDoc, cs.err = bsoncore.AppendDocumentEnd(csDoc, csIdx)
  263. if cs.err != nil {
  264. return cs.err
  265. }
  266. cs.pipelineSlice = append(cs.pipelineSlice, csDoc)
  267. for i := 0; i < val.Len(); i++ {
  268. var elem []byte
  269. elem, cs.err = transformBsoncoreDocument(cs.registry, val.Index(i).Interface())
  270. if cs.err != nil {
  271. return cs.err
  272. }
  273. cs.pipelineSlice = append(cs.pipelineSlice, elem)
  274. }
  275. return cs.err
  276. }
  277. func (cs *ChangeStream) createPipelineOptionsDoc() bsoncore.Document {
  278. plDocIdx, plDoc := bsoncore.AppendDocumentStart(nil)
  279. if cs.streamType == ClientStream {
  280. plDoc = bsoncore.AppendBooleanElement(plDoc, "allChangesForCluster", true)
  281. }
  282. if cs.options.FullDocument != nil {
  283. plDoc = bsoncore.AppendStringElement(plDoc, "fullDocument", string(*cs.options.FullDocument))
  284. }
  285. if cs.options.ResumeAfter != nil {
  286. var raDoc bsoncore.Document
  287. raDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.ResumeAfter)
  288. if cs.err != nil {
  289. return nil
  290. }
  291. plDoc = bsoncore.AppendDocumentElement(plDoc, "resumeAfter", raDoc)
  292. }
  293. if cs.options.StartAfter != nil {
  294. var saDoc bsoncore.Document
  295. saDoc, cs.err = transformBsoncoreDocument(cs.registry, cs.options.StartAfter)
  296. if cs.err != nil {
  297. return nil
  298. }
  299. plDoc = bsoncore.AppendDocumentElement(plDoc, "startAfter", saDoc)
  300. }
  301. if cs.options.StartAtOperationTime != nil {
  302. plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I)
  303. }
  304. if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
  305. return nil
  306. }
  307. return plDoc
  308. }
  309. func (cs *ChangeStream) pipelineToBSON() (bsoncore.Document, error) {
  310. pipelineDocIdx, pipelineArr := bsoncore.AppendArrayStart(nil)
  311. for i, doc := range cs.pipelineSlice {
  312. pipelineArr = bsoncore.AppendDocumentElement(pipelineArr, strconv.Itoa(i), doc)
  313. }
  314. if pipelineArr, cs.err = bsoncore.AppendArrayEnd(pipelineArr, pipelineDocIdx); cs.err != nil {
  315. return nil, cs.err
  316. }
  317. return pipelineArr, cs.err
  318. }
  319. func (cs *ChangeStream) replaceOptions(ctx context.Context, wireVersion *description.VersionRange) {
  320. // Cached resume token: use the resume token as the resumeAfter option and set no other resume options
  321. if cs.resumeToken != nil {
  322. cs.options.SetResumeAfter(cs.resumeToken)
  323. cs.options.SetStartAfter(nil)
  324. cs.options.SetStartAtOperationTime(nil)
  325. return
  326. }
  327. // No cached resume token but cached operation time: use the operation time as the startAtOperationTime option and
  328. // set no other resume options
  329. if (cs.sess.OperationTime != nil || cs.options.StartAtOperationTime != nil) && wireVersion.Max >= 7 {
  330. opTime := cs.options.StartAtOperationTime
  331. if cs.operationTime != nil {
  332. opTime = cs.sess.OperationTime
  333. }
  334. cs.options.SetStartAtOperationTime(opTime)
  335. cs.options.SetResumeAfter(nil)
  336. cs.options.SetStartAfter(nil)
  337. return
  338. }
  339. // No cached resume token or operation time: set none of the resume options
  340. cs.options.SetResumeAfter(nil)
  341. cs.options.SetStartAfter(nil)
  342. cs.options.SetStartAtOperationTime(nil)
  343. }
  344. // ID returns the cursor ID for this change stream.
  345. func (cs *ChangeStream) ID() int64 {
  346. if cs.cursor == nil {
  347. return 0
  348. }
  349. return cs.cursor.ID()
  350. }
  351. // Decode will decode the current document into val.
  352. func (cs *ChangeStream) Decode(val interface{}) error {
  353. if cs.cursor == nil {
  354. return ErrNilCursor
  355. }
  356. return bson.UnmarshalWithRegistry(cs.registry, cs.Current, val)
  357. }
  358. // Err returns the current error.
  359. func (cs *ChangeStream) Err() error {
  360. if cs.err != nil {
  361. return replaceErrors(cs.err)
  362. }
  363. if cs.cursor == nil {
  364. return nil
  365. }
  366. return replaceErrors(cs.cursor.Err())
  367. }
  368. // Close closes this cursor.
  369. func (cs *ChangeStream) Close(ctx context.Context) error {
  370. if ctx == nil {
  371. ctx = context.Background()
  372. }
  373. defer closeImplicitSession(cs.sess)
  374. if cs.cursor == nil {
  375. return nil // cursor is already closed
  376. }
  377. cs.err = replaceErrors(cs.cursor.Close(ctx))
  378. cs.cursor = nil
  379. return cs.Err()
  380. }
  381. // ResumeToken returns the last cached resume token for this change stream.
  382. func (cs *ChangeStream) ResumeToken() bson.Raw {
  383. return cs.resumeToken
  384. }
  385. // Next gets the next result from this change stream. Returns true if there were no errors and the next
  386. // result is available for decoding.
  387. func (cs *ChangeStream) Next(ctx context.Context) bool {
  388. if ctx == nil {
  389. ctx = context.Background()
  390. }
  391. if len(cs.batch) == 0 {
  392. cs.loopNext(ctx)
  393. if cs.err != nil || len(cs.batch) == 0 {
  394. cs.err = replaceErrors(cs.err)
  395. return false
  396. }
  397. }
  398. cs.Current = bson.Raw(cs.batch[0])
  399. cs.batch = cs.batch[1:]
  400. if cs.err = cs.storeResumeToken(); cs.err != nil {
  401. return false
  402. }
  403. return true
  404. }
  405. func (cs *ChangeStream) loopNext(ctx context.Context) {
  406. for {
  407. if cs.cursor == nil {
  408. return
  409. }
  410. if cs.cursor.Next(ctx) {
  411. // If this is the first batch, the batch cursor will return true, but the batch could be empty.
  412. if cs.batch, cs.err = cs.cursor.Batch().Documents(); cs.err != nil || len(cs.batch) > 0 {
  413. return
  414. }
  415. // no error but empty batch
  416. cs.updatePbrtFromCommand()
  417. continue
  418. }
  419. cs.err = replaceErrors(cs.cursor.Err())
  420. if cs.err == nil {
  421. // If a getMore was done but the batch was empty, the batch cursor will return false with no error
  422. if len(cs.batch) == 0 {
  423. continue
  424. }
  425. return
  426. }
  427. switch t := cs.err.(type) {
  428. case CommandError:
  429. if t.Code == errorInterrupted || t.Code == errorCappedPositionLost || t.Code == errorCursorKilled || t.HasErrorLabel("NonResumableChangeStreamError") {
  430. return
  431. }
  432. }
  433. // ignore error from cursor close because if the cursor is deleted or errors we tried to close it and will remake and try to get next batch
  434. _ = cs.cursor.Close(ctx)
  435. if cs.err = cs.executeOperation(ctx, true); cs.err != nil {
  436. return
  437. }
  438. }
  439. }
  440. // Returns true if the underlying cursor's batch is empty
  441. func (cs *ChangeStream) emptyBatch() bool {
  442. return cs.cursor.Batch().Empty()
  443. }
  444. // StreamType represents the type of a change stream.
  445. type StreamType uint8
  446. // These constants represent valid change stream types. A change stream can be initialized over a collection, all
  447. // collections in a database, or over a whole client.
  448. const (
  449. CollectionStream StreamType = iota
  450. DatabaseStream
  451. ClientStream
  452. )