- package mqtt
- import (
- "bytes"
- "io"
- "reflect"
- "testing"
- gbt "github.com/huin/gobinarytest"
- )
- type fakeSizePayload int
- func (p fakeSizePayload) Size() int {
- return int(p)
- }
- func (p fakeSizePayload) WritePayload(w io.Writer) error {
- return nil
- }
- func (p fakeSizePayload) ReadPayload(r io.Reader) error {
- return nil
- }
- type fakeDecoderConfig struct{}
- func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
- return fakeSizePayload(n), nil
- }
- func TestEncodeDecode(t *testing.T) {
- tests := []struct {
- Comment string
- DecoderConfig DecoderConfig
- Msg Message
- Expected gbt.Matcher
- NoEncodeTest bool
- }{
- {
- Comment: "CONNECT message",
- Msg: &Connect{
- ProtocolName: "MQIsdp",
- ProtocolVersion: 3,
- UsernameFlag: true,
- PasswordFlag: true,
- WillRetain: false,
- WillQos: 1,
- WillFlag: true,
- CleanSession: true,
- KeepAliveTimer: 10,
- ClientId: "xixihaha",
- WillTopic: "topic",
- WillMessage: "message",
- Username: "name",
- Password: "pwd",
- },
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x10}},
- gbt.Named{"Remaining length", gbt.Literal{12 + 5*2 + 8 + 5 + 7 + 4 + 3}},
- // Extended headers for CONNECT:
- gbt.Named{"Protocol name", gbt.InOrder{gbt.Literal{0x00, 0x06}, gbt.Literal("MQIsdp")}},
- gbt.Named{
- "Extended headers for CONNECT",
- gbt.Literal{
- 0x03, // Protocol version number
- 0xce, // Connect flags
- 0x00, 0x0a, // Keep alive timer
- },
- },
- // CONNECT payload:
- gbt.Named{"Client identifier", gbt.InOrder{gbt.Literal{0x00, 0x08}, gbt.Literal("xixihaha")}},
- gbt.Named{"Will topic", gbt.InOrder{gbt.Literal{0x00, 0x05}, gbt.Literal("topic")}},
- gbt.Named{"Will message", gbt.InOrder{gbt.Literal{0x00, 0x07}, gbt.Literal("message")}},
- gbt.Named{"Username", gbt.InOrder{gbt.Literal{0x00, 0x04}, gbt.Literal("name")}},
- gbt.Named{"Password", gbt.InOrder{gbt.Literal{0x00, 0x03}, gbt.Literal("pwd")}},
- },
- },
- {
- Comment: "CONNACK message",
- Msg: &ConnAck{
- ReturnCode: RetCodeBadUsernameOrPassword,
- },
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x20}},
- gbt.Named{"Remaining length", gbt.Literal{2}},
- gbt.Named{"Reserved byte", gbt.Literal{0}},
- gbt.Named{"Return code", gbt.Literal{4}},
- },
- },
- {
- Comment: "PUBLISH message with QoS = QosAtMostOnce",
- Msg: &Publish{
- Header: Header{
- DupFlag: false,
- QosLevel: QosAtMostOnce,
- Retain: false,
- },
- TopicName: "a/b",
- Payload: BytesPayload{1, 2, 3},
- },
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x30}},
- gbt.Named{"Remaining length", gbt.Literal{5 + 3}},
- gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
- // No MessageId should be present.
- gbt.Named{"Data", gbt.Literal{1, 2, 3}},
- },
- },
- {
- Comment: "PUBLISH message with QoS = QosAtLeastOnce",
- Msg: &Publish{
- Header: Header{
- DupFlag: true,
- QosLevel: QosAtLeastOnce,
- Retain: false,
- },
- TopicName: "a/b",
- MessageId: 0x1234,
- Payload: BytesPayload{1, 2, 3},
- },
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x3a}},
- gbt.Named{"Remaining length", gbt.Literal{7 + 3}},
- gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
- gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
- gbt.Named{"Data", gbt.Literal{1, 2, 3}},
- },
- },
- {
- Comment: "PUBLISH message with maximum size payload",
- DecoderConfig: fakeDecoderConfig{},
- Msg: &Publish{
- Header: Header{
- DupFlag: false,
- QosLevel: QosAtMostOnce,
- Retain: false,
- },
- TopicName: "a/b",
- Payload: fakeSizePayload(MaxPayloadSize - 5),
- },
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x30}},
- gbt.Named{"Remaining length", gbt.Literal{0xff, 0xff, 0xff, 0x7f}},
- gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
- // Our fake payload doesn't write any data, so no data should appear here.
- },
- },
- {
- Comment: "PUBACK message",
- Msg: &PubAck{MessageId: 0x1234},
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x40}},
- gbt.Named{"Remaining length", gbt.Literal{2}},
- gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
- },
- },
- {
- Comment: "PUBREC message",
- Msg: &PubRec{MessageId: 0x1234},
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x50}},
- gbt.Named{"Remaining length", gbt.Literal{2}},
- gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
- },
- },
- {
- Comment: "PUBREL message",
- Msg: &PubRel{MessageId: 0x1234},
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x60}},
- gbt.Named{"Remaining length", gbt.Literal{2}},
- gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
- },
- },
- {
- Comment: "PUBCOMP message",
- Msg: &PubComp{MessageId: 0x1234},
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x70}},
- gbt.Named{"Remaining length", gbt.Literal{2}},
- gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
- },
- },
- {
- Comment: "SUBSCRIBE message",
- Msg: &Subscribe{
- Header: Header{
- DupFlag: false,
- QosLevel: QosAtLeastOnce,
- },
- MessageId: 0x4321,
- Topics: []TopicQos{
- {"a/b", QosAtLeastOnce},
- {"c/d", QosExactlyOnce},
- },
- },
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x82}},
- gbt.Named{"Remaining length", gbt.Literal{2 + 5 + 1 + 5 + 1}},
- gbt.Named{"MessageId", gbt.Literal{0x43, 0x21}},
- gbt.Named{"First topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
- gbt.Named{"First topic QoS", gbt.Literal{1}},
- gbt.Named{"Second topic", gbt.Literal{0x00, 0x03, 'c', '/', 'd'}},
- gbt.Named{"Second topic QoS", gbt.Literal{2}},
- },
- },
- {
- Comment: "SUBACK message",
- Msg: &SubAck{
- MessageId: 0x1234,
- TopicsQos: []QosLevel{QosAtMostOnce, QosExactlyOnce},
- },
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x90}},
- gbt.Named{"Remaining length", gbt.Literal{4}},
- gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
- gbt.Named{"TopicsQos", gbt.Literal{0x00, 0x02}},
- },
- },
- {
- Comment: "UNSUBSCRIBE message",
- Msg: &Unsubscribe{
- Header: Header{
- DupFlag: false,
- QosLevel: QosAtLeastOnce,
- },
- MessageId: 0x4321,
- Topics: []string{"a/b", "c/d"},
- },
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0xa2}},
- gbt.Named{"Remaining length", gbt.Literal{2 + 5 + 5}},
- gbt.Named{"MessageId", gbt.Literal{0x43, 0x21}},
- gbt.Named{"First topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
- gbt.Named{"Second topic", gbt.Literal{0x00, 0x03, 'c', '/', 'd'}},
- },
- },
- {
- Comment: "UNSUBACK message",
- Msg: &UnsubAck{MessageId: 0x1234},
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0xb0}},
- gbt.Named{"Remaining length", gbt.Literal{2}},
- gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
- },
- },
- {
- Comment: "PINGREQ message",
- Msg: &PingReq{},
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0xc0}},
- gbt.Named{"Remaining length", gbt.Literal{0}},
- },
- },
- {
- Comment: "PINGRESP message",
- Msg: &PingResp{},
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0xd0}},
- gbt.Named{"Remaining length", gbt.Literal{0}},
- },
- },
- {
- Comment: "DISCONNECT message",
- Msg: &Disconnect{},
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0xe0}},
- gbt.Named{"Remaining length", gbt.Literal{0}},
- },
- },
- }
- for _, test := range tests {
- {
- // Test decoding.
- expectedBuf := new(bytes.Buffer)
- test.Expected.Write(expectedBuf)
- if decodedMsg, err := DecodeOneMessage(expectedBuf, test.DecoderConfig); err != nil {
- t.Errorf("%s: Unexpected error during decoding: %v", test.Comment, err)
- } else if !reflect.DeepEqual(test.Msg, decodedMsg) {
- t.Errorf("%s: Decoded value mismatch\n got = %#v\nexpected = %#v",
- test.Comment, decodedMsg, test.Msg)
- }
- }
- if !test.NoEncodeTest {
- // Test encoding.
- encodedBuf := new(bytes.Buffer)
- if err := test.Msg.Encode(encodedBuf); err != nil {
- t.Errorf("%s: Unexpected error during encoding: %v", test.Comment, err)
- } else if err = gbt.Matches(test.Expected, encodedBuf.Bytes()); err != nil {
- t.Errorf("%s: Unexpected encoding output: %v", test.Comment, err)
- }
- }
- }
- }
- func TestErrorEncode(t *testing.T) {
- tests := []struct {
- Comment string
- Msg Message
- }{
- {
- Comment: "Payload reports Size() that's too large for MQTT payload.",
- Msg: &Publish{
- TopicName: "big/message",
- MessageId: 0x1234,
- // MaxPayloadSize-5 is too large - the payload space is further
- // restricted by the variable header.
- Payload: fakeSizePayload(MaxPayloadSize),
- },
- },
- {
- Comment: "Payload reports Size() that would overflow when added to variable header size.",
- Msg: &Publish{
- TopicName: "big/message",
- MessageId: 0x1234,
- Payload: fakeSizePayload(0x7fffffff),
- },
- },
- }
- for _, test := range tests {
- encodedBuf := new(bytes.Buffer)
- if err := test.Msg.Encode(encodedBuf); err == nil {
- t.Errorf("%s: Expected error during encoding, but got nil.", test.Comment)
- }
- }
- }
- func TestErrorDecode(t *testing.T) {
- tests := []struct {
- Comment string
- Expected gbt.Matcher
- }{
- {
- Comment: "Immediate EOF",
- Expected: gbt.Literal{},
- },
- {
- Comment: "EOF at 1 byte",
- Expected: gbt.Literal{0x10},
- },
- {
- Comment: "PUBACK message with too short a length",
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x40}},
- gbt.Named{"Remaining length", gbt.Literal{1}},
- gbt.Named{"Truncated MessageId", gbt.Literal{0x12}},
- },
- },
- {
- Comment: "PUBACK message with too long a length",
- Expected: gbt.InOrder{
- gbt.Named{"Header byte", gbt.Literal{0x40}},
- gbt.Named{"Remaining length", gbt.Literal{3}},
- gbt.Named{"Truncated MessageId", gbt.Literal{0x12, 0x34, 0x56}},
- },
- },
- }
- for _, test := range tests {
- expectedBuf := new(bytes.Buffer)
- test.Expected.Write(expectedBuf)
- if _, err := DecodeOneMessage(expectedBuf, nil); err == nil {
- t.Errorf("%s: Expected error during decoding, but got nil.", test.Comment)
- }
- }
- }
- func TestLengthEncodeDecode(t *testing.T) {
- tests := []struct {
- Value int32
- Encoded gbt.Matcher
- }{
- {0, gbt.Literal{0}},
- {1, gbt.Literal{1}},
- {20, gbt.Literal{20}},
- // Boundary conditions used as tests taken from MQTT 3.1 spec.
- {0, gbt.Literal{0x00}},
- {127, gbt.Literal{0x7F}},
- {128, gbt.Literal{0x80, 0x01}},
- {16383, gbt.Literal{0xFF, 0x7F}},
- {16384, gbt.Literal{0x80, 0x80, 0x01}},
- {2097151, gbt.Literal{0xFF, 0xFF, 0x7F}},
- {2097152, gbt.Literal{0x80, 0x80, 0x80, 0x01}},
- {268435455, gbt.Literal{0xFF, 0xFF, 0xFF, 0x7F}},
- }
- for _, test := range tests {
- {
- // Test decoding.
- buf := new(bytes.Buffer)
- test.Encoded.Write(buf)
- buf = bytes.NewBuffer(buf.Bytes())
- if result := decodeLength(buf); test.Value != result {
- t.Errorf("Decoding test %#x: got %#x", test.Value, result)
- }
- }
- {
- // Test encoding.
- buf := new(bytes.Buffer)
- encodeLength(test.Value, buf)
- if err := gbt.Matches(test.Encoded, buf.Bytes()); err != nil {
- t.Errorf("Encoding test %#x: %v", test.Value, err)
- }
- }
- }
- }
- type SeqBytePayload struct {
- N int
- T *testing.T
- }
- func (p *SeqBytePayload) Size() int {
- return p.N
- }
- func (p *SeqBytePayload) WritePayload(w io.Writer) error {
- buf := make([]byte, 256)
- for i := range buf {
- buf[i] = byte(i)
- }
- for toWrite := p.N; toWrite > 0; toWrite -= 256 {
- writeThisTime := toWrite
- if writeThisTime > 256 {
- writeThisTime = 256
- }
- if _, err := w.Write(buf[:writeThisTime]); err != nil {
- return err
- }
- }
- return nil
- }
- func (p *SeqBytePayload) ReadPayload(r io.Reader) error {
- buf := make([]byte, 256)
- for toRead := p.N; toRead > 0; toRead -= 256 {
- readThisTime := toRead
- if readThisTime > 256 {
- readThisTime = 256
- }
- if _, err := io.ReadFull(r, buf[:readThisTime]); err != nil {
- p.T.Errorf("Got unexpected error %v", err)
- return err
- }
- for i, v := range buf[:readThisTime] {
- if v != byte(i) {
- p.T.Errorf("Got unexpected byte %#x, expected %#x", v, i)
- return io.ErrClosedPipe
- }
- }
- }
- // Any more reads should produce an EOF.
- if n, err := r.Read(buf); n > 0 {
- p.T.Errorf("Got unexpected extra %d bytes of data", n)
- } else if err != io.EOF {
- p.T.Errorf("Expected err=EOF, got %v", err)
- }
- return nil
- }
- // Stream a reasonably large Publish payload containing sequential byte values.
- func TestPipedPublish(t *testing.T) {
- r, w := io.Pipe()
- complete := make(chan bool)
- payload := &SeqBytePayload{
- N: 20 + (1 << 20),
- T: t,
- }
- go func() {
- defer func() {
- complete <- true
- w.Close()
- }()
- msg := &Publish{
- TopicName: "foo",
- Payload: payload,
- }
- if err := msg.Encode(w); err != nil {
- t.Error(err)
- }
- }()
- go func() {
- defer func() {
- complete <- true
- r.Close()
- }()
- expectedMsg := &Publish{
- TopicName: "foo",
- Payload: payload,
- }
- testConfig := &ValueConfig{payload}
- if msg, err := DecodeOneMessage(r, testConfig); err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(expectedMsg, msg) {
- t.Errorf(" got = %#v\nexpected = %#v", msg, expectedMsg)
- }
- }()
- _ = <-complete
- _ = <-complete
- }