|
- // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Source code and contact info at http://github.com/streadway/amqp
-
- package amqp
-
- import (
- "bufio"
- "bytes"
- "encoding/binary"
- "errors"
- "io"
- "math"
- "time"
- )
-
- func (w *writer) WriteFrame(frame frame) (err error) {
- if err = frame.write(w.w); err != nil {
- return
- }
-
- if buf, ok := w.w.(*bufio.Writer); ok {
- err = buf.Flush()
- }
-
- return
- }
-
- func (f *methodFrame) write(w io.Writer) (err error) {
- var payload bytes.Buffer
-
- if f.Method == nil {
- return errors.New("malformed frame: missing method")
- }
-
- class, method := f.Method.id()
-
- if err = binary.Write(&payload, binary.BigEndian, class); err != nil {
- return
- }
-
- if err = binary.Write(&payload, binary.BigEndian, method); err != nil {
- return
- }
-
- if err = f.Method.write(&payload); err != nil {
- return
- }
-
- return writeFrame(w, frameMethod, f.ChannelId, payload.Bytes())
- }
-
- // Heartbeat
- //
- // Payload is empty
- func (f *heartbeatFrame) write(w io.Writer) (err error) {
- return writeFrame(w, frameHeartbeat, f.ChannelId, []byte{})
- }
-
- // CONTENT HEADER
- // 0 2 4 12 14
- // +----------+--------+-----------+----------------+------------- - -
- // | class-id | weight | body size | property flags | property list...
- // +----------+--------+-----------+----------------+------------- - -
- // short short long long short remainder...
- //
- func (f *headerFrame) write(w io.Writer) (err error) {
- var payload bytes.Buffer
- var zeroTime time.Time
-
- if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil {
- return
- }
-
- if err = binary.Write(&payload, binary.BigEndian, f.weight); err != nil {
- return
- }
-
- if err = binary.Write(&payload, binary.BigEndian, f.Size); err != nil {
- return
- }
-
- // First pass will build the mask to be serialized, second pass will serialize
- // each of the fields that appear in the mask.
-
- var mask uint16
-
- if len(f.Properties.ContentType) > 0 {
- mask = mask | flagContentType
- }
- if len(f.Properties.ContentEncoding) > 0 {
- mask = mask | flagContentEncoding
- }
- if f.Properties.Headers != nil && len(f.Properties.Headers) > 0 {
- mask = mask | flagHeaders
- }
- if f.Properties.DeliveryMode > 0 {
- mask = mask | flagDeliveryMode
- }
- if f.Properties.Priority > 0 {
- mask = mask | flagPriority
- }
- if len(f.Properties.CorrelationId) > 0 {
- mask = mask | flagCorrelationId
- }
- if len(f.Properties.ReplyTo) > 0 {
- mask = mask | flagReplyTo
- }
- if len(f.Properties.Expiration) > 0 {
- mask = mask | flagExpiration
- }
- if len(f.Properties.MessageId) > 0 {
- mask = mask | flagMessageId
- }
- if f.Properties.Timestamp != zeroTime {
- mask = mask | flagTimestamp
- }
- if len(f.Properties.Type) > 0 {
- mask = mask | flagType
- }
- if len(f.Properties.UserId) > 0 {
- mask = mask | flagUserId
- }
- if len(f.Properties.AppId) > 0 {
- mask = mask | flagAppId
- }
-
- if err = binary.Write(&payload, binary.BigEndian, mask); err != nil {
- return
- }
-
- if hasProperty(mask, flagContentType) {
- if err = writeShortstr(&payload, f.Properties.ContentType); err != nil {
- return
- }
- }
- if hasProperty(mask, flagContentEncoding) {
- if err = writeShortstr(&payload, f.Properties.ContentEncoding); err != nil {
- return
- }
- }
- if hasProperty(mask, flagHeaders) {
- if err = writeTable(&payload, f.Properties.Headers); err != nil {
- return
- }
- }
- if hasProperty(mask, flagDeliveryMode) {
- if err = binary.Write(&payload, binary.BigEndian, f.Properties.DeliveryMode); err != nil {
- return
- }
- }
- if hasProperty(mask, flagPriority) {
- if err = binary.Write(&payload, binary.BigEndian, f.Properties.Priority); err != nil {
- return
- }
- }
- if hasProperty(mask, flagCorrelationId) {
- if err = writeShortstr(&payload, f.Properties.CorrelationId); err != nil {
- return
- }
- }
- if hasProperty(mask, flagReplyTo) {
- if err = writeShortstr(&payload, f.Properties.ReplyTo); err != nil {
- return
- }
- }
- if hasProperty(mask, flagExpiration) {
- if err = writeShortstr(&payload, f.Properties.Expiration); err != nil {
- return
- }
- }
- if hasProperty(mask, flagMessageId) {
- if err = writeShortstr(&payload, f.Properties.MessageId); err != nil {
- return
- }
- }
- if hasProperty(mask, flagTimestamp) {
- if err = binary.Write(&payload, binary.BigEndian, uint64(f.Properties.Timestamp.Unix())); err != nil {
- return
- }
- }
- if hasProperty(mask, flagType) {
- if err = writeShortstr(&payload, f.Properties.Type); err != nil {
- return
- }
- }
- if hasProperty(mask, flagUserId) {
- if err = writeShortstr(&payload, f.Properties.UserId); err != nil {
- return
- }
- }
- if hasProperty(mask, flagAppId) {
- if err = writeShortstr(&payload, f.Properties.AppId); err != nil {
- return
- }
- }
-
- return writeFrame(w, frameHeader, f.ChannelId, payload.Bytes())
- }
-
- // Body
- //
- // Payload is one byterange from the full body who's size is declared in the
- // Header frame
- func (f *bodyFrame) write(w io.Writer) (err error) {
- return writeFrame(w, frameBody, f.ChannelId, f.Body)
- }
-
- func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err error) {
- end := []byte{frameEnd}
- size := uint(len(payload))
-
- _, err = w.Write([]byte{
- byte(typ),
- byte((channel & 0xff00) >> 8),
- byte((channel & 0x00ff) >> 0),
- byte((size & 0xff000000) >> 24),
- byte((size & 0x00ff0000) >> 16),
- byte((size & 0x0000ff00) >> 8),
- byte((size & 0x000000ff) >> 0),
- })
-
- if err != nil {
- return
- }
-
- if _, err = w.Write(payload); err != nil {
- return
- }
-
- if _, err = w.Write(end); err != nil {
- return
- }
-
- return
- }
-
- func writeShortstr(w io.Writer, s string) (err error) {
- b := []byte(s)
-
- var length = uint8(len(b))
-
- if err = binary.Write(w, binary.BigEndian, length); err != nil {
- return
- }
-
- if _, err = w.Write(b[:length]); err != nil {
- return
- }
-
- return
- }
-
- func writeLongstr(w io.Writer, s string) (err error) {
- b := []byte(s)
-
- var length = uint32(len(b))
-
- if err = binary.Write(w, binary.BigEndian, length); err != nil {
- return
- }
-
- if _, err = w.Write(b[:length]); err != nil {
- return
- }
-
- return
- }
-
- /*
- 'A': []interface{}
- 'D': Decimal
- 'F': Table
- 'I': int32
- 'S': string
- 'T': time.Time
- 'V': nil
- 'b': byte
- 'd': float64
- 'f': float32
- 'l': int64
- 's': int16
- 't': bool
- 'x': []byte
- */
- func writeField(w io.Writer, value interface{}) (err error) {
- var buf [9]byte
- var enc []byte
-
- switch v := value.(type) {
- case bool:
- buf[0] = 't'
- if v {
- buf[1] = byte(1)
- } else {
- buf[1] = byte(0)
- }
- enc = buf[:2]
-
- case byte:
- buf[0] = 'b'
- buf[1] = byte(v)
- enc = buf[:2]
-
- case int16:
- buf[0] = 's'
- binary.BigEndian.PutUint16(buf[1:3], uint16(v))
- enc = buf[:3]
-
- case int:
- buf[0] = 'I'
- binary.BigEndian.PutUint32(buf[1:5], uint32(v))
- enc = buf[:5]
-
- case int32:
- buf[0] = 'I'
- binary.BigEndian.PutUint32(buf[1:5], uint32(v))
- enc = buf[:5]
-
- case int64:
- buf[0] = 'l'
- binary.BigEndian.PutUint64(buf[1:9], uint64(v))
- enc = buf[:9]
-
- case float32:
- buf[0] = 'f'
- binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v))
- enc = buf[:5]
-
- case float64:
- buf[0] = 'd'
- binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v))
- enc = buf[:9]
-
- case Decimal:
- buf[0] = 'D'
- buf[1] = byte(v.Scale)
- binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value))
- enc = buf[:6]
-
- case string:
- buf[0] = 'S'
- binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
- enc = append(buf[:5], []byte(v)...)
-
- case []interface{}: // field-array
- buf[0] = 'A'
-
- sec := new(bytes.Buffer)
- for _, val := range v {
- if err = writeField(sec, val); err != nil {
- return
- }
- }
-
- binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len()))
- if _, err = w.Write(buf[:5]); err != nil {
- return
- }
-
- if _, err = w.Write(sec.Bytes()); err != nil {
- return
- }
-
- return
-
- case time.Time:
- buf[0] = 'T'
- binary.BigEndian.PutUint64(buf[1:9], uint64(v.Unix()))
- enc = buf[:9]
-
- case Table:
- if _, err = w.Write([]byte{'F'}); err != nil {
- return
- }
- return writeTable(w, v)
-
- case []byte:
- buf[0] = 'x'
- binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
- if _, err = w.Write(buf[0:5]); err != nil {
- return
- }
- if _, err = w.Write(v); err != nil {
- return
- }
- return
-
- case nil:
- buf[0] = 'V'
- enc = buf[:1]
-
- default:
- return ErrFieldType
- }
-
- _, err = w.Write(enc)
-
- return
- }
-
- func writeTable(w io.Writer, table Table) (err error) {
- var buf bytes.Buffer
-
- for key, val := range table {
- if err = writeShortstr(&buf, key); err != nil {
- return
- }
- if err = writeField(&buf, val); err != nil {
- return
- }
- }
-
- return writeLongstr(w, string(buf.Bytes()))
- }
|