You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

subscription.go 27 kB

4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741
  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "strings"
  21. "sync"
  22. "time"
  23. "cloud.google.com/go/iam"
  24. "cloud.google.com/go/internal/optional"
  25. "github.com/golang/protobuf/ptypes"
  26. durpb "github.com/golang/protobuf/ptypes/duration"
  27. gax "github.com/googleapis/gax-go/v2"
  28. "golang.org/x/sync/errgroup"
  29. pb "google.golang.org/genproto/googleapis/pubsub/v1"
  30. fmpb "google.golang.org/genproto/protobuf/field_mask"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/status"
  33. )
  34. // Subscription is a reference to a PubSub subscription.
  35. type Subscription struct {
  36. c *Client
  37. // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
  38. name string
  39. // Settings for pulling messages. Configure these before calling Receive.
  40. ReceiveSettings ReceiveSettings
  41. mu sync.Mutex
  42. receiveActive bool
  43. }
  44. // Subscription creates a reference to a subscription.
  45. func (c *Client) Subscription(id string) *Subscription {
  46. return c.SubscriptionInProject(id, c.projectID)
  47. }
  48. // SubscriptionInProject creates a reference to a subscription in a given project.
  49. func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
  50. return &Subscription{
  51. c: c,
  52. name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
  53. }
  54. }
  55. // String returns the globally unique printable name of the subscription.
  56. func (s *Subscription) String() string {
  57. return s.name
  58. }
  59. // ID returns the unique identifier of the subscription within its project.
  60. func (s *Subscription) ID() string {
  61. slash := strings.LastIndex(s.name, "/")
  62. if slash == -1 {
  63. // name is not a fully-qualified name.
  64. panic("bad subscription name")
  65. }
  66. return s.name[slash+1:]
  67. }
  68. // Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
  69. func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
  70. it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{
  71. Project: c.fullyQualifiedProjectName(),
  72. })
  73. return &SubscriptionIterator{
  74. c: c,
  75. next: func() (string, error) {
  76. sub, err := it.Next()
  77. if err != nil {
  78. return "", err
  79. }
  80. return sub.Name, nil
  81. },
  82. }
  83. }
  84. // SubscriptionIterator is an iterator that returns a series of subscriptions.
  85. type SubscriptionIterator struct {
  86. c *Client
  87. next func() (string, error)
  88. }
  89. // Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
  90. func (subs *SubscriptionIterator) Next() (*Subscription, error) {
  91. subName, err := subs.next()
  92. if err != nil {
  93. return nil, err
  94. }
  95. return &Subscription{c: subs.c, name: subName}, nil
  96. }
  97. // PushConfig contains configuration for subscriptions that operate in push mode.
  98. type PushConfig struct {
  99. // A URL locating the endpoint to which messages should be pushed.
  100. Endpoint string
  101. // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
  102. Attributes map[string]string
  103. // AuthenticationMethod is used by push endpoints to verify the source
  104. // of push requests.
  105. // It can be used with push endpoints that are private by default to
  106. // allow requests only from the Cloud Pub/Sub system, for example.
  107. // This field is optional and should be set only by users interested in
  108. // authenticated push.
  109. //
  110. // It is EXPERIMENTAL and a part of a closed alpha that may not be
  111. // accessible to all users. This field is subject to change or removal
  112. // without notice.
  113. AuthenticationMethod AuthenticationMethod
  114. }
  115. func (pc *PushConfig) toProto() *pb.PushConfig {
  116. if pc == nil {
  117. return nil
  118. }
  119. pbCfg := &pb.PushConfig{
  120. Attributes: pc.Attributes,
  121. PushEndpoint: pc.Endpoint,
  122. }
  123. if authMethod := pc.AuthenticationMethod; authMethod != nil {
  124. switch am := authMethod.(type) {
  125. case *OIDCToken:
  126. pbCfg.AuthenticationMethod = am.toProto()
  127. default: // TODO: add others here when GAIC adds more definitions.
  128. }
  129. }
  130. return pbCfg
  131. }
  132. // AuthenticationMethod is used by push points to verify the source of push requests.
  133. // This interface defines fields that are part of a closed alpha that may not be accessible
  134. // to all users.
  135. type AuthenticationMethod interface {
  136. isAuthMethod() bool
  137. }
  138. // OIDCToken allows PushConfigs to be authenticated using
  139. // the OpenID Connect protocol https://openid.net/connect/
  140. type OIDCToken struct {
  141. // Audience to be used when generating OIDC token. The audience claim
  142. // identifies the recipients that the JWT is intended for. The audience
  143. // value is a single case-sensitive string. Having multiple values (array)
  144. // for the audience field is not supported. More info about the OIDC JWT
  145. // token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3
  146. // Note: if not specified, the Push endpoint URL will be used.
  147. Audience string
  148. // The service account email to be used for generating the OpenID Connect token.
  149. // The caller of:
  150. // * CreateSubscription
  151. // * UpdateSubscription
  152. // * ModifyPushConfig
  153. // calls must have the iam.serviceAccounts.actAs permission for the service account.
  154. // See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles.
  155. ServiceAccountEmail string
  156. }
  157. var _ AuthenticationMethod = (*OIDCToken)(nil)
  158. func (oidcToken *OIDCToken) isAuthMethod() bool { return true }
  159. func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
  160. if oidcToken == nil {
  161. return nil
  162. }
  163. return &pb.PushConfig_OidcToken_{
  164. OidcToken: &pb.PushConfig_OidcToken{
  165. Audience: oidcToken.Audience,
  166. ServiceAccountEmail: oidcToken.ServiceAccountEmail,
  167. },
  168. }
  169. }
  170. // SubscriptionConfig describes the configuration of a subscription.
  171. type SubscriptionConfig struct {
  172. Topic *Topic
  173. PushConfig PushConfig
  174. // The default maximum time after a subscriber receives a message before
  175. // the subscriber should acknowledge the message. Note: messages which are
  176. // obtained via Subscription.Receive need not be acknowledged within this
  177. // deadline, as the deadline will be automatically extended.
  178. AckDeadline time.Duration
  179. // Whether to retain acknowledged messages. If true, acknowledged messages
  180. // will not be expunged until they fall out of the RetentionDuration window.
  181. RetainAckedMessages bool
  182. // How long to retain messages in backlog, from the time of publish. If
  183. // RetainAckedMessages is true, this duration affects the retention of
  184. // acknowledged messages, otherwise only unacknowledged messages are retained.
  185. // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
  186. RetentionDuration time.Duration
  187. // Expiration policy specifies the conditions for a subscription's expiration.
  188. // A subscription is considered active as long as any connected subscriber is
  189. // successfully consuming messages from the subscription or is issuing
  190. // operations on the subscription. If `expiration_policy` is not set, a
  191. // *default policy* with `ttl` of 31 days will be used. The minimum allowed
  192. // value for `expiration_policy.ttl` is 1 day.
  193. //
  194. // Use time.Duration(0) to indicate that the subscription should never expire.
  195. //
  196. // It is EXPERIMENTAL and subject to change or removal without notice.
  197. ExpirationPolicy optional.Duration
  198. // The set of labels for the subscription.
  199. Labels map[string]string
  200. }
  201. func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
  202. var pbPushConfig *pb.PushConfig
  203. if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
  204. pbPushConfig = cfg.PushConfig.toProto()
  205. }
  206. var retentionDuration *durpb.Duration
  207. if cfg.RetentionDuration != 0 {
  208. retentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
  209. }
  210. return &pb.Subscription{
  211. Name: name,
  212. Topic: cfg.Topic.name,
  213. PushConfig: pbPushConfig,
  214. AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
  215. RetainAckedMessages: cfg.RetainAckedMessages,
  216. MessageRetentionDuration: retentionDuration,
  217. Labels: cfg.Labels,
  218. ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy),
  219. }
  220. }
  221. func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) {
  222. rd := time.Hour * 24 * 7
  223. var err error
  224. if pbSub.MessageRetentionDuration != nil {
  225. rd, err = ptypes.Duration(pbSub.MessageRetentionDuration)
  226. if err != nil {
  227. return SubscriptionConfig{}, err
  228. }
  229. }
  230. var expirationPolicy time.Duration
  231. if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil {
  232. expirationPolicy, err = ptypes.Duration(ttl)
  233. if err != nil {
  234. return SubscriptionConfig{}, err
  235. }
  236. }
  237. subC := SubscriptionConfig{
  238. Topic: newTopic(c, pbSub.Topic),
  239. AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
  240. RetainAckedMessages: pbSub.RetainAckedMessages,
  241. RetentionDuration: rd,
  242. Labels: pbSub.Labels,
  243. ExpirationPolicy: expirationPolicy,
  244. }
  245. pc := protoToPushConfig(pbSub.PushConfig)
  246. if pc != nil {
  247. subC.PushConfig = *pc
  248. }
  249. return subC, nil
  250. }
  251. func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
  252. if pbPc == nil {
  253. return nil
  254. }
  255. pc := &PushConfig{
  256. Endpoint: pbPc.PushEndpoint,
  257. Attributes: pbPc.Attributes,
  258. }
  259. if am := pbPc.AuthenticationMethod; am != nil {
  260. if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil {
  261. pc.AuthenticationMethod = &OIDCToken{
  262. Audience: oidcToken.OidcToken.GetAudience(),
  263. ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(),
  264. }
  265. }
  266. }
  267. return pc
  268. }
  269. // ReceiveSettings configure the Receive method.
  270. // A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
  271. type ReceiveSettings struct {
  272. // MaxExtension is the maximum period for which the Subscription should
  273. // automatically extend the ack deadline for each message.
  274. //
  275. // The Subscription will automatically extend the ack deadline of all
  276. // fetched Messages up to the duration specified. Automatic deadline
  277. // extension beyond the initial receipt may be disabled by specifying a
  278. // duration less than 0.
  279. MaxExtension time.Duration
  280. // MaxOutstandingMessages is the maximum number of unprocessed messages
  281. // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
  282. // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
  283. // If the value is negative, then there will be no limit on the number of
  284. // unprocessed messages.
  285. MaxOutstandingMessages int
  286. // MaxOutstandingBytes is the maximum size of unprocessed messages
  287. // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
  288. // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
  289. // the value is negative, then there will be no limit on the number of bytes
  290. // for unprocessed messages.
  291. MaxOutstandingBytes int
  292. // NumGoroutines is the number of goroutines Receive will spawn to pull
  293. // messages concurrently. If NumGoroutines is less than 1, it will be treated
  294. // as if it were DefaultReceiveSettings.NumGoroutines.
  295. //
  296. // NumGoroutines does not limit the number of messages that can be processed
  297. // concurrently. Even with one goroutine, many messages might be processed at
  298. // once, because that goroutine may continually receive messages and invoke the
  299. // function passed to Receive on them. To limit the number of messages being
  300. // processed concurrently, set MaxOutstandingMessages.
  301. NumGoroutines int
  302. // If Synchronous is true, then no more than MaxOutstandingMessages will be in
  303. // memory at one time. (In contrast, when Synchronous is false, more than
  304. // MaxOutstandingMessages may have been received from the service and in memory
  305. // before being processed.) MaxOutstandingBytes still refers to the total bytes
  306. // processed, rather than in memory. NumGoroutines is ignored.
  307. // The default is false.
  308. Synchronous bool
  309. }
  310. // For synchronous receive, the time to wait if we are already processing
  311. // MaxOutstandingMessages. There is no point calling Pull and asking for zero
  312. // messages, so we pause to allow some message-processing callbacks to finish.
  313. //
  314. // The wait time is large enough to avoid consuming significant CPU, but
  315. // small enough to provide decent throughput. Users who want better
  316. // throughput should not be using synchronous mode.
  317. //
  318. // Waiting might seem like polling, so it's natural to think we could do better by
  319. // noticing when a callback is finished and immediately calling Pull. But if
  320. // callbacks finish in quick succession, this will result in frequent Pull RPCs that
  321. // request a single message, which wastes network bandwidth. Better to wait for a few
  322. // callbacks to finish, so we make fewer RPCs fetching more messages.
  323. //
  324. // This value is unexported so the user doesn't have another knob to think about. Note that
  325. // it is the same value as the one used for nackTicker, so it matches this client's
  326. // idea of a duration that is short, but not so short that we perform excessive RPCs.
  327. const synchronousWaitTime = 100 * time.Millisecond
  328. // This is a var so that tests can change it.
  329. var minAckDeadline = 10 * time.Second
  330. // DefaultReceiveSettings holds the default values for ReceiveSettings.
  331. var DefaultReceiveSettings = ReceiveSettings{
  332. MaxExtension: 10 * time.Minute,
  333. MaxOutstandingMessages: 1000,
  334. MaxOutstandingBytes: 1e9, // 1G
  335. NumGoroutines: 1,
  336. }
  337. // Delete deletes the subscription.
  338. func (s *Subscription) Delete(ctx context.Context) error {
  339. return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name})
  340. }
  341. // Exists reports whether the subscription exists on the server.
  342. func (s *Subscription) Exists(ctx context.Context) (bool, error) {
  343. _, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
  344. if err == nil {
  345. return true, nil
  346. }
  347. if status.Code(err) == codes.NotFound {
  348. return false, nil
  349. }
  350. return false, err
  351. }
  352. // Config fetches the current configuration for the subscription.
  353. func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {
  354. pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
  355. if err != nil {
  356. return SubscriptionConfig{}, err
  357. }
  358. cfg, err := protoToSubscriptionConfig(pbSub, s.c)
  359. if err != nil {
  360. return SubscriptionConfig{}, err
  361. }
  362. return cfg, nil
  363. }
  364. // SubscriptionConfigToUpdate describes how to update a subscription.
  365. type SubscriptionConfigToUpdate struct {
  366. // If non-nil, the push config is changed.
  367. PushConfig *PushConfig
  368. // If non-zero, the ack deadline is changed.
  369. AckDeadline time.Duration
  370. // If set, RetainAckedMessages is changed.
  371. RetainAckedMessages optional.Bool
  372. // If non-zero, RetentionDuration is changed.
  373. RetentionDuration time.Duration
  374. // If non-zero, Expiration is changed.
  375. ExpirationPolicy optional.Duration
  376. // If non-nil, the current set of labels is completely
  377. // replaced by the new set.
  378. // This field has beta status. It is not subject to the stability guarantee
  379. // and may change.
  380. Labels map[string]string
  381. }
  382. // Update changes an existing subscription according to the fields set in cfg.
  383. // It returns the new SubscriptionConfig.
  384. //
  385. // Update returns an error if no fields were modified.
  386. func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) {
  387. req := s.updateRequest(&cfg)
  388. if err := cfg.validate(); err != nil {
  389. return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %v", err)
  390. }
  391. if len(req.UpdateMask.Paths) == 0 {
  392. return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update")
  393. }
  394. rpsub, err := s.c.subc.UpdateSubscription(ctx, req)
  395. if err != nil {
  396. return SubscriptionConfig{}, err
  397. }
  398. return protoToSubscriptionConfig(rpsub, s.c)
  399. }
  400. func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest {
  401. psub := &pb.Subscription{Name: s.name}
  402. var paths []string
  403. if cfg.PushConfig != nil {
  404. psub.PushConfig = cfg.PushConfig.toProto()
  405. paths = append(paths, "push_config")
  406. }
  407. if cfg.AckDeadline != 0 {
  408. psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
  409. paths = append(paths, "ack_deadline_seconds")
  410. }
  411. if cfg.RetainAckedMessages != nil {
  412. psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages)
  413. paths = append(paths, "retain_acked_messages")
  414. }
  415. if cfg.RetentionDuration != 0 {
  416. psub.MessageRetentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
  417. paths = append(paths, "message_retention_duration")
  418. }
  419. if cfg.ExpirationPolicy != nil {
  420. psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy)
  421. paths = append(paths, "expiration_policy")
  422. }
  423. if cfg.Labels != nil {
  424. psub.Labels = cfg.Labels
  425. paths = append(paths, "labels")
  426. }
  427. return &pb.UpdateSubscriptionRequest{
  428. Subscription: psub,
  429. UpdateMask: &fmpb.FieldMask{Paths: paths},
  430. }
  431. }
  432. const (
  433. // The minimum expiration policy duration is 1 day as per:
  434. // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607
  435. minExpirationPolicy = 24 * time.Hour
  436. // If an expiration policy is not specified, the default of 31 days is used as per:
  437. // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606
  438. defaultExpirationPolicy = 31 * 24 * time.Hour
  439. )
  440. func (cfg *SubscriptionConfigToUpdate) validate() error {
  441. if cfg == nil || cfg.ExpirationPolicy == nil {
  442. return nil
  443. }
  444. policy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy
  445. if policy == 0 || policy >= min {
  446. return nil
  447. }
  448. return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", policy, min)
  449. }
  450. func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy {
  451. if expirationPolicy == nil {
  452. return nil
  453. }
  454. dur := optional.ToDuration(expirationPolicy)
  455. var ttl *durpb.Duration
  456. // As per:
  457. // https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl
  458. // if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire.
  459. if dur != 0 {
  460. ttl = ptypes.DurationProto(dur)
  461. }
  462. return &pb.ExpirationPolicy{
  463. Ttl: ttl,
  464. }
  465. }
  466. // IAM returns the subscription's IAM handle.
  467. func (s *Subscription) IAM() *iam.Handle {
  468. return iam.InternalNewHandle(s.c.subc.Connection(), s.name)
  469. }
  470. // CreateSubscription creates a new subscription on a topic.
  471. //
  472. // id is the name of the subscription to create. It must start with a letter,
  473. // and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
  474. // underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
  475. // must be between 3 and 255 characters in length, and must not start with
  476. // "goog".
  477. //
  478. // cfg.Topic is the topic from which the subscription should receive messages. It
  479. // need not belong to the same project as the subscription. This field is required.
  480. //
  481. // cfg.AckDeadline is the maximum time after a subscriber receives a message before
  482. // the subscriber should acknowledge the message. It must be between 10 and 600
  483. // seconds (inclusive), and is rounded down to the nearest second. If the
  484. // provided ackDeadline is 0, then the default value of 10 seconds is used.
  485. // Note: messages which are obtained via Subscription.Receive need not be
  486. // acknowledged within this deadline, as the deadline will be automatically
  487. // extended.
  488. //
  489. // cfg.PushConfig may be set to configure this subscription for push delivery.
  490. //
  491. // If the subscription already exists an error will be returned.
  492. func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) {
  493. if cfg.Topic == nil {
  494. return nil, errors.New("pubsub: require non-nil Topic")
  495. }
  496. if cfg.AckDeadline == 0 {
  497. cfg.AckDeadline = 10 * time.Second
  498. }
  499. if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second {
  500. return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
  501. }
  502. sub := c.Subscription(id)
  503. _, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name))
  504. if err != nil {
  505. return nil, err
  506. }
  507. return sub, nil
  508. }
  509. var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription")
  510. // Receive calls f with the outstanding messages from the subscription.
  511. // It blocks until ctx is done, or the service returns a non-retryable error.
  512. //
  513. // The standard way to terminate a Receive is to cancel its context:
  514. //
  515. // cctx, cancel := context.WithCancel(ctx)
  516. // err := sub.Receive(cctx, callback)
  517. // // Call cancel from callback, or another goroutine.
  518. //
  519. // If the service returns a non-retryable error, Receive returns that error after
  520. // all of the outstanding calls to f have returned. If ctx is done, Receive
  521. // returns nil after all of the outstanding calls to f have returned and
  522. // all messages have been acknowledged or have expired.
  523. //
  524. // Receive calls f concurrently from multiple goroutines. It is encouraged to
  525. // process messages synchronously in f, even if that processing is relatively
  526. // time-consuming; Receive will spawn new goroutines for incoming messages,
  527. // limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
  528. //
  529. // The context passed to f will be canceled when ctx is Done or there is a
  530. // fatal service error.
  531. //
  532. // Receive will send an ack deadline extension on message receipt, then
  533. // automatically extend the ack deadline of all fetched Messages up to the
  534. // period specified by s.ReceiveSettings.MaxExtension.
  535. //
  536. // Each Subscription may have only one invocation of Receive active at a time.
  537. func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error {
  538. s.mu.Lock()
  539. if s.receiveActive {
  540. s.mu.Unlock()
  541. return errReceiveInProgress
  542. }
  543. s.receiveActive = true
  544. s.mu.Unlock()
  545. defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
  546. maxCount := s.ReceiveSettings.MaxOutstandingMessages
  547. if maxCount == 0 {
  548. maxCount = DefaultReceiveSettings.MaxOutstandingMessages
  549. }
  550. maxBytes := s.ReceiveSettings.MaxOutstandingBytes
  551. if maxBytes == 0 {
  552. maxBytes = DefaultReceiveSettings.MaxOutstandingBytes
  553. }
  554. maxExt := s.ReceiveSettings.MaxExtension
  555. if maxExt == 0 {
  556. maxExt = DefaultReceiveSettings.MaxExtension
  557. } else if maxExt < 0 {
  558. // If MaxExtension is negative, disable automatic extension.
  559. maxExt = 0
  560. }
  561. var numGoroutines int
  562. switch {
  563. case s.ReceiveSettings.Synchronous:
  564. numGoroutines = 1
  565. case s.ReceiveSettings.NumGoroutines >= 1:
  566. numGoroutines = s.ReceiveSettings.NumGoroutines
  567. default:
  568. numGoroutines = DefaultReceiveSettings.NumGoroutines
  569. }
  570. // TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
  571. po := &pullOptions{
  572. maxExtension: maxExt,
  573. maxPrefetch: trunc32(int64(maxCount)),
  574. synchronous: s.ReceiveSettings.Synchronous,
  575. }
  576. fc := newFlowController(maxCount, maxBytes)
  577. // Wait for all goroutines started by Receive to return, so instead of an
  578. // obscure goroutine leak we have an obvious blocked call to Receive.
  579. group, gctx := errgroup.WithContext(ctx)
  580. for i := 0; i < numGoroutines; i++ {
  581. group.Go(func() error {
  582. return s.receive(gctx, po, fc, f)
  583. })
  584. }
  585. return group.Wait()
  586. }
  587. func (s *Subscription) receive(ctx context.Context, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error {
  588. // Cancel a sub-context when we return, to kick the context-aware callbacks
  589. // and the goroutine below.
  590. ctx2, cancel := context.WithCancel(ctx)
  591. // The iterator does not use the context passed to Receive. If it did, canceling
  592. // that context would immediately stop the iterator without waiting for unacked
  593. // messages.
  594. iter := newMessageIterator(s.c.subc, s.name, po)
  595. // We cannot use errgroup from Receive here. Receive might already be calling group.Wait,
  596. // and group.Wait cannot be called concurrently with group.Go. We give each receive() its
  597. // own WaitGroup instead.
  598. // Since wg.Add is only called from the main goroutine, wg.Wait is guaranteed
  599. // to be called after all Adds.
  600. var wg sync.WaitGroup
  601. wg.Add(1)
  602. go func() {
  603. <-ctx2.Done()
  604. // Call stop when Receive's context is done.
  605. // Stop will block until all outstanding messages have been acknowledged
  606. // or there was a fatal service error.
  607. iter.stop()
  608. wg.Done()
  609. }()
  610. defer wg.Wait()
  611. defer cancel()
  612. for {
  613. var maxToPull int32 // maximum number of messages to pull
  614. if po.synchronous {
  615. if po.maxPrefetch < 0 {
  616. // If there is no limit on the number of messages to pull, use a reasonable default.
  617. maxToPull = 1000
  618. } else {
  619. // Limit the number of messages in memory to MaxOutstandingMessages
  620. // (here, po.maxPrefetch). For each message currently in memory, we have
  621. // called fc.acquire but not fc.release: this is fc.count(). The next
  622. // call to Pull should fetch no more than the difference between these
  623. // values.
  624. maxToPull = po.maxPrefetch - int32(fc.count())
  625. if maxToPull <= 0 {
  626. // Wait for some callbacks to finish.
  627. if err := gax.Sleep(ctx, synchronousWaitTime); err != nil {
  628. // Return nil if the context is done, not err.
  629. return nil
  630. }
  631. continue
  632. }
  633. }
  634. }
  635. msgs, err := iter.receive(maxToPull)
  636. if err == io.EOF {
  637. return nil
  638. }
  639. if err != nil {
  640. return err
  641. }
  642. for i, msg := range msgs {
  643. msg := msg
  644. // TODO(jba): call acquire closer to when the message is allocated.
  645. if err := fc.acquire(ctx, len(msg.Data)); err != nil {
  646. // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
  647. for _, m := range msgs[i:] {
  648. m.Nack()
  649. }
  650. // Return nil if the context is done, not err.
  651. return nil
  652. }
  653. old := msg.doneFunc
  654. msgLen := len(msg.Data)
  655. msg.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
  656. defer fc.release(msgLen)
  657. old(ackID, ack, receiveTime)
  658. }
  659. wg.Add(1)
  660. go func() {
  661. defer wg.Done()
  662. f(ctx2, msg)
  663. }()
  664. }
  665. }
  666. }
  667. type pullOptions struct {
  668. maxExtension time.Duration
  669. maxPrefetch int32
  670. // If true, use unary Pull instead of StreamingPull, and never pull more
  671. // than maxPrefetch messages.
  672. synchronous bool
  673. }