打印

github.com/huin/mqtt项目解析

[复制链接]
7049|22
手机看帖
扫描二维码
随时随地手机跟帖
跳转到指定楼层
楼主
沙发
keer_zu|  楼主 | 2017-3-7 14:41 | 只看该作者
mqtt

An MQTT encoder and decoder,written in Golang.

This library was modified heavily from https://github.com/plucury/mqtt.go and is API-incompatible with it.

Currently the library's API is unstable.

从上面介绍可知,主要是一个mqtt的编解码器。

使用特权

评论回复
板凳
keer_zu|  楼主 | 2017-3-7 14:43 | 只看该作者
先看它的TEST : mqtt_test.go

package mqtt

import (
        "bytes"
        "io"
        "reflect"
        "testing"

        gbt "github.com/huin/gobinarytest"
)

type fakeSizePayload int

func (p fakeSizePayload) Size() int {
        return int(p)
}

func (p fakeSizePayload) WritePayload(w io.Writer) error {
        return nil
}

func (p fakeSizePayload) ReadPayload(r io.Reader) error {
        return nil
}

type fakeDecoderConfig struct{}

func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
        return fakeSizePayload(n), nil
}

func TestEncodeDecode(t *testing.T) {
        tests := []struct {
                Comment       string
                DecoderConfig DecoderConfig
                Msg           Message
                Expected      gbt.Matcher
                NoEncodeTest  bool
        }{
                {
                        Comment: "CONNECT message",
                        Msg: &Connect{
                                ProtocolName:    "MQIsdp",
                                ProtocolVersion: 3,
                                UsernameFlag:    true,
                                PasswordFlag:    true,
                                WillRetain:      false,
                                WillQos:         1,
                                WillFlag:        true,
                                CleanSession:    true,
                                KeepAliveTimer:  10,
                                ClientId:        "xixihaha",
                                WillTopic:       "topic",
                                WillMessage:     "message",
                                Username:        "name",
                                Password:        "pwd",
                        },
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x10}},
                                gbt.Named{"Remaining length", gbt.Literal{12 + 5*2 + 8 + 5 + 7 + 4 + 3}},

                                // Extended headers for CONNECT:
                                gbt.Named{"Protocol name", gbt.InOrder{gbt.Literal{0x00, 0x06}, gbt.Literal("MQIsdp")}},
                                gbt.Named{
                                        "Extended headers for CONNECT",
                                        gbt.Literal{
                                                0x03,       // Protocol version number
                                                0xce,       // Connect flags
                                                0x00, 0x0a, // Keep alive timer
                                        },
                                },

                                // CONNECT payload:
                                gbt.Named{"Client identifier", gbt.InOrder{gbt.Literal{0x00, 0x08}, gbt.Literal("xixihaha")}},
                                gbt.Named{"Will topic", gbt.InOrder{gbt.Literal{0x00, 0x05}, gbt.Literal("topic")}},
                                gbt.Named{"Will message", gbt.InOrder{gbt.Literal{0x00, 0x07}, gbt.Literal("message")}},
                                gbt.Named{"Username", gbt.InOrder{gbt.Literal{0x00, 0x04}, gbt.Literal("name")}},
                                gbt.Named{"Password", gbt.InOrder{gbt.Literal{0x00, 0x03}, gbt.Literal("pwd")}},
                        },
                },

                {
                        Comment: "CONNACK message",
                        Msg: &ConnAck{
                                ReturnCode: RetCodeBadUsernameOrPassword,
                        },
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x20}},
                                gbt.Named{"Remaining length", gbt.Literal{2}},

                                gbt.Named{"Reserved byte", gbt.Literal{0}},
                                gbt.Named{"Return code", gbt.Literal{4}},
                        },
                },

                {
                        Comment: "PUBLISH message with QoS = QosAtMostOnce",
                        Msg: &Publish{
                                Header: Header{
                                        DupFlag:  false,
                                        QosLevel: QosAtMostOnce,
                                        Retain:   false,
                                },
                                TopicName: "a/b",
                                Payload:   BytesPayload{1, 2, 3},
                        },
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x30}},
                                gbt.Named{"Remaining length", gbt.Literal{5 + 3}},

                                gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
                                // No MessageId should be present.
                                gbt.Named{"Data", gbt.Literal{1, 2, 3}},
                        },
                },

                {
                        Comment: "PUBLISH message with QoS = QosAtLeastOnce",
                        Msg: &Publish{
                                Header: Header{
                                        DupFlag:  true,
                                        QosLevel: QosAtLeastOnce,
                                        Retain:   false,
                                },
                                TopicName: "a/b",
                                MessageId: 0x1234,
                                Payload:   BytesPayload{1, 2, 3},
                        },
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x3a}},
                                gbt.Named{"Remaining length", gbt.Literal{7 + 3}},

                                gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
                                gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
                                gbt.Named{"Data", gbt.Literal{1, 2, 3}},
                        },
                },

                {
                        Comment:       "PUBLISH message with maximum size payload",
                        DecoderConfig: fakeDecoderConfig{},
                        Msg: &Publish{
                                Header: Header{
                                        DupFlag:  false,
                                        QosLevel: QosAtMostOnce,
                                        Retain:   false,
                                },
                                TopicName: "a/b",
                                Payload:   fakeSizePayload(MaxPayloadSize - 5),
                        },
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x30}},
                                gbt.Named{"Remaining length", gbt.Literal{0xff, 0xff, 0xff, 0x7f}},

                                gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
                                // Our fake payload doesn't write any data, so no data should appear here.
                        },
                },

                {
                        Comment: "PUBACK message",
                        Msg:     &PubAck{MessageId: 0x1234},
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x40}},
                                gbt.Named{"Remaining length", gbt.Literal{2}},
                                gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
                        },
                },

                {
                        Comment: "PUBREC message",
                        Msg:     &PubRec{MessageId: 0x1234},
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x50}},
                                gbt.Named{"Remaining length", gbt.Literal{2}},
                                gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
                        },
                },

                {
                        Comment: "PUBREL message",
                        Msg:     &PubRel{MessageId: 0x1234},
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x60}},
                                gbt.Named{"Remaining length", gbt.Literal{2}},
                                gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
                        },
                },

                {
                        Comment: "PUBCOMP message",
                        Msg:     &PubComp{MessageId: 0x1234},
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x70}},
                                gbt.Named{"Remaining length", gbt.Literal{2}},
                                gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
                        },
                },

                {
                        Comment: "SUBSCRIBE message",
                        Msg: &Subscribe{
                                Header: Header{
                                        DupFlag:  false,
                                        QosLevel: QosAtLeastOnce,
                                },
                                MessageId: 0x4321,
                                Topics: []TopicQos{
                                        {"a/b", QosAtLeastOnce},
                                        {"c/d", QosExactlyOnce},
                                },
                        },
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x82}},
                                gbt.Named{"Remaining length", gbt.Literal{2 + 5 + 1 + 5 + 1}},

                                gbt.Named{"MessageId", gbt.Literal{0x43, 0x21}},
                                gbt.Named{"First topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
                                gbt.Named{"First topic QoS", gbt.Literal{1}},
                                gbt.Named{"Second topic", gbt.Literal{0x00, 0x03, 'c', '/', 'd'}},
                                gbt.Named{"Second topic QoS", gbt.Literal{2}},
                        },
                },

                {
                        Comment: "SUBACK message",
                        Msg: &SubAck{
                                MessageId: 0x1234,
                                TopicsQos: []QosLevel{QosAtMostOnce, QosExactlyOnce},
                        },
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x90}},
                                gbt.Named{"Remaining length", gbt.Literal{4}},
                                gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
                                gbt.Named{"TopicsQos", gbt.Literal{0x00, 0x02}},
                        },
                },

                {
                        Comment: "UNSUBSCRIBE message",
                        Msg: &Unsubscribe{
                                Header: Header{
                                        DupFlag:  false,
                                        QosLevel: QosAtLeastOnce,
                                },
                                MessageId: 0x4321,
                                Topics:    []string{"a/b", "c/d"},
                        },
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0xa2}},
                                gbt.Named{"Remaining length", gbt.Literal{2 + 5 + 5}},

                                gbt.Named{"MessageId", gbt.Literal{0x43, 0x21}},
                                gbt.Named{"First topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
                                gbt.Named{"Second topic", gbt.Literal{0x00, 0x03, 'c', '/', 'd'}},
                        },
                },

                {
                        Comment: "UNSUBACK message",
                        Msg:     &UnsubAck{MessageId: 0x1234},
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0xb0}},
                                gbt.Named{"Remaining length", gbt.Literal{2}},
                                gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
                        },
                },

                {
                        Comment: "PINGREQ message",
                        Msg:     &PingReq{},
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0xc0}},
                                gbt.Named{"Remaining length", gbt.Literal{0}},
                        },
                },

                {
                        Comment: "PINGRESP message",
                        Msg:     &PingResp{},
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0xd0}},
                                gbt.Named{"Remaining length", gbt.Literal{0}},
                        },
                },

                {
                        Comment: "DISCONNECT message",
                        Msg:     &Disconnect{},
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0xe0}},
                                gbt.Named{"Remaining length", gbt.Literal{0}},
                        },
                },
        }

        for _, test := range tests {
                {
                        // Test decoding.
                        expectedBuf := new(bytes.Buffer)
                        test.Expected.Write(expectedBuf)

                        if decodedMsg, err := DecodeOneMessage(expectedBuf, test.DecoderConfig); err != nil {
                                t.Errorf("%s: Unexpected error during decoding: %v", test.Comment, err)
                        } else if !reflect.DeepEqual(test.Msg, decodedMsg) {
                                t.Errorf("%s: Decoded value mismatch\n     got = %#v\nexpected = %#v",
                                        test.Comment, decodedMsg, test.Msg)
                        }
                }

                if !test.NoEncodeTest {
                        // Test encoding.
                        encodedBuf := new(bytes.Buffer)
                        if err := test.Msg.Encode(encodedBuf); err != nil {
                                t.Errorf("%s: Unexpected error during encoding: %v", test.Comment, err)
                        } else if err = gbt.Matches(test.Expected, encodedBuf.Bytes()); err != nil {
                                t.Errorf("%s: Unexpected encoding output: %v", test.Comment, err)
                        }
                }
        }
}

func TestErrorEncode(t *testing.T) {
        tests := []struct {
                Comment string
                Msg     Message
        }{
                {
                        Comment: "Payload reports Size() that's too large for MQTT payload.",
                        Msg: &Publish{
                                TopicName: "big/message",
                                MessageId: 0x1234,
                                // MaxPayloadSize-5 is too large - the payload space is further
                                // restricted by the variable header.
                                Payload: fakeSizePayload(MaxPayloadSize),
                        },
                },
                {
                        Comment: "Payload reports Size() that would overflow when added to variable header size.",
                        Msg: &Publish{
                                TopicName: "big/message",
                                MessageId: 0x1234,
                                Payload:   fakeSizePayload(0x7fffffff),
                        },
                },
        }

        for _, test := range tests {
                encodedBuf := new(bytes.Buffer)
                if err := test.Msg.Encode(encodedBuf); err == nil {
                        t.Errorf("%s: Expected error during encoding, but got nil.", test.Comment)
                }
        }
}

func TestErrorDecode(t *testing.T) {
        tests := []struct {
                Comment  string
                Expected gbt.Matcher
        }{
                {
                        Comment:  "Immediate EOF",
                        Expected: gbt.Literal{},
                },
                {
                        Comment:  "EOF at 1 byte",
                        Expected: gbt.Literal{0x10},
                },
                {
                        Comment: "PUBACK message with too short a length",
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x40}},
                                gbt.Named{"Remaining length", gbt.Literal{1}},

                                gbt.Named{"Truncated MessageId", gbt.Literal{0x12}},
                        },
                },
                {
                        Comment: "PUBACK message with too long a length",
                        Expected: gbt.InOrder{
                                gbt.Named{"Header byte", gbt.Literal{0x40}},
                                gbt.Named{"Remaining length", gbt.Literal{3}},

                                gbt.Named{"Truncated MessageId", gbt.Literal{0x12, 0x34, 0x56}},
                        },
                },
        }

        for _, test := range tests {
                expectedBuf := new(bytes.Buffer)
                test.Expected.Write(expectedBuf)

                if _, err := DecodeOneMessage(expectedBuf, nil); err == nil {
                        t.Errorf("%s: Expected error during decoding, but got nil.", test.Comment)
                }
        }
}

func TestLengthEncodeDecode(t *testing.T) {
        tests := []struct {
                Value   int32
                Encoded gbt.Matcher
        }{
                {0, gbt.Literal{0}},
                {1, gbt.Literal{1}},
                {20, gbt.Literal{20}},

                // Boundary conditions used as tests taken from MQTT 3.1 spec.
                {0, gbt.Literal{0x00}},
                {127, gbt.Literal{0x7F}},
                {128, gbt.Literal{0x80, 0x01}},
                {16383, gbt.Literal{0xFF, 0x7F}},
                {16384, gbt.Literal{0x80, 0x80, 0x01}},
                {2097151, gbt.Literal{0xFF, 0xFF, 0x7F}},
                {2097152, gbt.Literal{0x80, 0x80, 0x80, 0x01}},
                {268435455, gbt.Literal{0xFF, 0xFF, 0xFF, 0x7F}},
        }

        for _, test := range tests {
                {
                        // Test decoding.
                        buf := new(bytes.Buffer)
                        test.Encoded.Write(buf)
                        buf = bytes.NewBuffer(buf.Bytes())
                        if result := decodeLength(buf); test.Value != result {
                                t.Errorf("Decoding test %#x: got %#x", test.Value, result)
                        }
                }
                {
                        // Test encoding.
                        buf := new(bytes.Buffer)
                        encodeLength(test.Value, buf)
                        if err := gbt.Matches(test.Encoded, buf.Bytes()); err != nil {
                                t.Errorf("Encoding test %#x: %v", test.Value, err)
                        }
                }
        }
}

type SeqBytePayload struct {
        N int
        T *testing.T
}

func (p *SeqBytePayload) Size() int {
        return p.N
}

func (p *SeqBytePayload) WritePayload(w io.Writer) error {
        buf := make([]byte, 256)
        for i := range buf {
                buf[i] = byte(i)
        }

        for toWrite := p.N; toWrite > 0; toWrite -= 256 {
                writeThisTime := toWrite
                if writeThisTime > 256 {
                        writeThisTime = 256
                }
                if _, err := w.Write(buf[:writeThisTime]); err != nil {
                        return err
                }
        }
        return nil
}

func (p *SeqBytePayload) ReadPayload(r io.Reader) error {
        buf := make([]byte, 256)

        for toRead := p.N; toRead > 0; toRead -= 256 {
                readThisTime := toRead
                if readThisTime > 256 {
                        readThisTime = 256
                }
                if _, err := io.ReadFull(r, buf[:readThisTime]); err != nil {
                        p.T.Errorf("Got unexpected error %v", err)
                        return err
                }
                for i, v := range buf[:readThisTime] {
                        if v != byte(i) {
                                p.T.Errorf("Got unexpected byte %#x, expected %#x", v, i)
                                return io.ErrClosedPipe
                        }
                }
        }

        // Any more reads should produce an EOF.
        if n, err := r.Read(buf); n > 0 {
                p.T.Errorf("Got unexpected extra %d bytes of data", n)
        } else if err != io.EOF {
                p.T.Errorf("Expected err=EOF, got %v", err)
        }

        return nil
}

// Stream a reasonably large Publish payload containing sequential byte values.
func TestPipedPublish(t *testing.T) {
        r, w := io.Pipe()

        complete := make(chan bool)

        payload := &SeqBytePayload{
                N: 20 + (1 << 20),
                T: t,
        }

        go func() {
                defer func() {
                        complete <- true
                        w.Close()
                }()

                msg := &Publish{
                        TopicName: "foo",
                        Payload:   payload,
                }
                if err := msg.Encode(w); err != nil {
                        t.Error(err)
                }
        }()

        go func() {
                defer func() {
                        complete <- true
                        r.Close()
                }()

                expectedMsg := &Publish{
                        TopicName: "foo",
                        Payload:   payload,
                }

                testConfig := &ValueConfig{payload}

                if msg, err := DecodeOneMessage(r, testConfig); err != nil {
                        t.Error(err)
                } else if !reflect.DeepEqual(expectedMsg, msg) {
                        t.Errorf("     got = %#v\nexpected = %#v", msg, expectedMsg)
                }
        }()

        _ = <-complete
        _ = <-complete
}

使用特权

评论回复
地板
keer_zu|  楼主 | 2017-3-7 15:14 | 只看该作者
本帖最后由 keer_zu 于 2017-3-7 15:16 编辑
keer_zu 发表于 2017-3-7 14:43
先看它的TEST : mqtt_test.go

第28行:func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)

这里涉及到一个类型 Publish,下面就先来分析一下这个类型:

// Publish represents an MQTT PUBLISH message.

type Publish struct {
        Header
        TopicName string
        MessageId uint16
        Payload   Payload
}





上面是头。

3.3.2 可变报头可变报头按顺序包含主题名和报文标识符。
主题名 Topic Name主题名(Topic Name)用于识别有效载荷数据应该被发布到哪一个信息通道。
主题名必须是PUBLISH报文可变报头的第一个字段。它必须是 1.5.3节定义的UTF-8编码的字符串 [MQTT-3.3.2-1]。
PUBLISH报文中的主题名不能包含通配符 [MQTT-3.3.2-2]。
服务端发送给订阅客户端的PUBLISH报文的主题名必须匹配该订阅的主题过滤器(根据 4.7节定义的匹配过程)[MQTT-3.3.2-3]。
报文标识符 Packet Identifier只有当QoS等级是1或2时,报文标识符(Packet Identifier)字段才能出现在PUBLISH报文中。2.3.1节提供了有关报文标识符的更多信息。
可变报头非规范示例图例 3.11 – PUBLISH报文可变报头非规范示例 举例说明了 表格 3.3 - PUBLISH报文非规范示例 中简要描述的PUBLISH报文的可变报头。
表格 3.3 - PUBLISH报文非规范示例[td]
FieldValue
主题名a/b
报文标识符10





使用特权

评论回复
5
keer_zu|  楼主 | 2017-3-7 15:18 | 只看该作者
keer_zu 发表于 2017-3-7 15:14
第28行:func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)

...

有效载荷

有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度。包含零长度有效载荷的PUBLISH报文是合法的。

以上就是publish的三部分组成。

func (msg *Publish) Encode(w io.Writer) (err error) {
        buf := new(bytes.Buffer)

        setString(msg.TopicName, buf)
        if msg.Header.QosLevel.HasId() {
                setUint16(msg.MessageId, buf)
        }

        if err = writeMessage(w, MsgPublish, &msg.Header, buf, int32(msg.Payload.Size())); err != nil {
                return
        }

        return msg.Payload.WritePayload(w)
}

func (msg *Publish) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error) {
        defer func() {
                err = recoverError(err, recover())
        }()

        msg.Header = hdr

        msg.TopicName = getString(r, &packetRemaining)
        if msg.Header.QosLevel.HasId() {
                msg.MessageId = getUint16(r, &packetRemaining)
        }

        payloadReader := &io.LimitedReader{r, int64(packetRemaining)}

        if msg.Payload, err = config.MakePayload(msg, payloadReader, int(packetRemaining)); err != nil {
                return
        }

        return msg.Payload.ReadPayload(payloadReader)
}


该类型支持这两者方法,分别是编码和解码。

使用特权

评论回复
6
keer_zu|  楼主 | 2017-3-7 16:37 | 只看该作者
yyy71cj 发表于 2017-3-7 15:18
本想给你加个精的,一看是他人板块……

使用特权

评论回复
7
keer_zu|  楼主 | 2017-3-7 17:10 | 只看该作者

可以了解一下mqtt

使用特权

评论回复
8
keer_zu|  楼主 | 2017-3-7 21:02 | 只看该作者
可以看到整个message.go就是对:


    3.0 Contents – MQTT控制报文
    3.1 CONNECT – 连接服务端
    3.2 CONNACK – 确认连接请求
    3.3 PUBLISH – 发布消息
    3.4 PUBACK –发布确认
    3.5 PUBREC – 发布收到(QoS 2,第一步)
    3.6 PUBREL – 发布释放(QoS 2,第二步)
    3.7 PUBCOMP – 发布完成(QoS 2,第三步)
    3.8 SUBSCRIBE - 订阅主题
    3.9 SUBACK – 订阅确认
    3.10 UNSUBSCRIBE –取消订阅
    3.11 UNSUBACK – 取消订阅确认
    3.12 PINGREQ – 心跳请求
    3.13 PINGRESP – 心跳响应
    3.14 DISCONNECT –断开连接

这些消息的编解码。

使用特权

评论回复
9
keer_zu|  楼主 | 2017-3-7 21:04 | 只看该作者
encodeing.go 只是最基本的运算和转换


package mqtt

import (
        "bytes"
        "io"
)

func getUint8(r io.Reader, packetRemaining *int32) uint8 {
        if *packetRemaining < 1 {
                raiseError(dataExceedsPacketError)
        }

        var b [1]byte
        if _, err := io.ReadFull(r, b[:]); err != nil {
                raiseError(err)
        }
        *packetRemaining--

        return b[0]
}

func getUint16(r io.Reader, packetRemaining *int32) uint16 {
        if *packetRemaining < 2 {
                raiseError(dataExceedsPacketError)
        }

        var b [2]byte
        if _, err := io.ReadFull(r, b[:]); err != nil {
                raiseError(err)
        }
        *packetRemaining -= 2

        return uint16(b[0])<<8 | uint16(b[1])
}

func getString(r io.Reader, packetRemaining *int32) string {
        strLen := int(getUint16(r, packetRemaining))

        if int(*packetRemaining) < strLen {
                raiseError(dataExceedsPacketError)
        }

        b := make([]byte, strLen)
        if _, err := io.ReadFull(r, b); err != nil {
                raiseError(err)
        }
        *packetRemaining -= int32(strLen)

        return string(b)
}

func setUint8(val uint8, buf *bytes.Buffer) {
        buf.WriteByte(byte(val))
}

func setUint16(val uint16, buf *bytes.Buffer) {
        buf.WriteByte(byte(val & 0xff00 >> 8))
        buf.WriteByte(byte(val & 0x00ff))
}

func setString(val string, buf *bytes.Buffer) {
        length := uint16(len(val))
        setUint16(length, buf)
        buf.WriteString(val)
}

func boolToByte(val bool) byte {
        if val {
                return byte(1)
        }
        return byte(0)
}

func decodeLength(r io.Reader) int32 {
        var v int32
        var buf [1]byte
        var shift uint
        for i := 0; i < 4; i++ {
                if _, err := io.ReadFull(r, buf[:]); err != nil {
                        raiseError(err)
                }

                b := buf[0]
                v |= int32(b&0x7f) << shift

                if b&0x80 == 0 {
                        return v
                }
                shift += 7
        }

        raiseError(badLengthEncodingError)
        panic("unreachable")
}

func encodeLength(length int32, buf *bytes.Buffer) {
        if length == 0 {
                buf.WriteByte(0)
                return
        }
        for length > 0 {
                digit := length & 0x7f
                length = length >> 7
                if length > 0 {
                        digit = digit | 0x80
                }
                buf.WriteByte(byte(digit))
        }
}




使用特权

评论回复
10
keer_zu|  楼主 | 2017-3-7 21:11 | 只看该作者
payload.go 很短,定义了Payload接口,同时实现了BytesPayload和StreamedPayload两种类型。

他们都可以以这个接口调用,具体对外的方法有:
Size()  ----- 返回Payload的长度。
WritePayload() ------ 写Payload
ReadPayload() ------ 读Payload

代码如下:

package mqtt

import (
        "io"
)

// Payload is the interface for Publish payloads. Typically the BytesPayload
// implementation will be sufficient for small payloads whose full contents
// will exist in memory. However, other implementations can read or write
// payloads requiring them holding their complete contents in memory.
type Payload interface {
        // Size returns the number of bytes that WritePayload will write.
        Size() int

        // WritePayload writes the payload data to w. Implementations must write
        // Size() bytes of data, but it is *not* required to do so prior to
        // returning. Size() bytes must have been written to w prior to another
        // message being encoded to the underlying connection.
        WritePayload(w io.Writer) error

        // ReadPayload reads the payload data from r (r will EOF at the end of the
        // payload). It is *not* required for r to have been consumed prior to this
        // returning. r must have been consumed completely prior to another message
        // being decoded from the underlying connection.
        ReadPayload(r io.Reader) error
}

// BytesPayload reads/writes a plain slice of bytes.
type BytesPayload []byte

func (p BytesPayload) Size() int {
        return len(p)
}

func (p BytesPayload) WritePayload(w io.Writer) error {
        _, err := w.Write(p)
        return err
}

func (p BytesPayload) ReadPayload(r io.Reader) error {
        _, err := io.ReadFull(r, p)
        return err
}

// StreamedPayload writes payload data from reader, or reads payload data into a writer.
type StreamedPayload struct {
        // N indicates payload size to the encoder. This many bytes will be read from
        // the reader when encoding. The number of bytes in the payload will be
        // stored here when decoding.
        N int

        // EncodingSource is used to copy data from when encoding a Publish message
        // onto the wire. This can be
        EncodingSource io.Reader

        // DecodingSink is used to copy data to when decoding a Publish message from
        // the wire. This can be nil if the payload is only being used for encoding.
        DecodingSink io.Writer
}

func (p *StreamedPayload) Size() int {
        return p.N
}

func (p *StreamedPayload) WritePayload(w io.Writer) error {
        _, err := io.CopyN(w, p.EncodingSource, int64(p.N))
        return err
}

func (p *StreamedPayload) ReadPayload(r io.Reader) error {
        n, err := io.Copy(p.DecodingSink, r)
        p.N = int(n)
        return err
}


使用特权

评论回复
11
keer_zu|  楼主 | 2017-3-7 22:03 | 只看该作者
再来看看mqtt.go
他是在messages.go和payload.go等基础上 的再封装,可以从前面注释和后面代码看出:
他主要提供了两个API:
1. func DecodeOneMessage(r io.Reader, config DecoderConfig) (msg Message, err error)
2. func NewMessage(msgType MessageType) (msg Message, err error)
分别用来解析一条mqtt消息和构建一条mqtt消息。至此,这个project就很明显了,它只提供mqtt消息的处理。
然后让我们再回到之前调用他的那个project: 一起看看这个go语言实现的mqtt

// Implementation of MQTT V3.1 encoding and decoding.
//
// See http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
// for the MQTT protocol specification. This package does not implement the
// semantics of MQTT, but purely the encoding and decoding of its messages.
//
// Decoding Messages:
//
// Use the DecodeOneMessage function to read a Message from an io.Reader, it
// will return a Message value. The function can be implemented using the public
// API of this package if more control is required. For example:
//
//   for {
//     msg, err := mqtt.DecodeOneMessage(conn, nil)
//     if err != nil {
//       // handle err
//     }
//     switch msg := msg.(type) {
//     case *Connect:
//       // ...
//     case *Publish:
//       // ...
//       // etc.
//     }
//   }
//
// Encoding Messages:
//
// Create a message value, and use its Encode method to write it to an
// io.Writer. For example:
//
//   someData := []byte{1, 2, 3}
//   msg := &Publish{
//     Header: {
//       DupFlag: false,
//       QosLevel: QosAtLeastOnce,
//       Retain: false,
//     },
//     TopicName: "a/b",
//     MessageId: 10,
//     Payload: BytesPayload(someData),
//   }
//   if err := msg.Encode(conn); err != nil {
//     // handle err
//   }
//
// Advanced PUBLISH payload handling:
//
// The default behaviour for decoding PUBLISH payloads, and most common way to
// supply payloads for encoding, is the BytesPayload, which is a []byte
// derivative.
//
// More complex handling is possible by implementing the Payload interface,
// which can be injected into DecodeOneMessage via the `config` parameter, or
// into an outgoing Publish message via its Payload field.  Potential benefits
// of this include:
//
// * Data can be (un)marshalled directly on a connection, without an unecessary
// round-trip via bytes.Buffer.
//
// * Data can be streamed directly on readers/writers (e.g files, other
// connections, pipes) without the requirement to buffer an entire message
// payload in memory at once.
//
// The limitations of these streaming features are:
//
// * When encoding a payload, the encoded size of the payload must be known and
// declared upfront.
//
// * The payload size (and PUBLISH variable header) can be no more than 256MiB
// minus 1 byte. This is a specified limitation of MQTT v3.1 itself.
package mqtt

import (
        "errors"
        "io"
)

var (
        badMsgTypeError        = errors.New("mqtt: message type is invalid")
        badQosError            = errors.New("mqtt: QoS is invalid")
        badWillQosError        = errors.New("mqtt: will QoS is invalid")
        badLengthEncodingError = errors.New("mqtt: remaining length field exceeded maximum of 4 bytes")
        badReturnCodeError     = errors.New("mqtt: is invalid")
        dataExceedsPacketError = errors.New("mqtt: data exceeds packet length")
        msgTooLongError        = errors.New("mqtt: message is too long")
)

const (
        QosAtMostOnce = QosLevel(iota)
        QosAtLeastOnce
        QosExactlyOnce

        qosFirstInvalid
)

type QosLevel uint8

func (qos QosLevel) IsValid() bool {
        return qos < qosFirstInvalid
}

func (qos QosLevel) HasId() bool {
        return qos == QosAtLeastOnce || qos == QosExactlyOnce
}

const (
        RetCodeAccepted = ReturnCode(iota)
        RetCodeUnacceptableProtocolVersion
        RetCodeIdentifierRejected
        RetCodeServerUnavailable
        RetCodeBadUsernameOrPassword
        RetCodeNotAuthorized

        retCodeFirstInvalid
)

type ReturnCode uint8

func (rc ReturnCode) IsValid() bool {
        return rc >= RetCodeAccepted && rc < retCodeFirstInvalid
}

// DecoderConfig provides configuration for decoding messages.
type DecoderConfig interface {
        // MakePayload returns a Payload for the given Publish message. r is a Reader
        // that will read the payload data, and n is the number of bytes in the
        // payload. The Payload.ReadPayload method is called on the returned payload
        // by the decoding process.
        MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)
}

type DefaultDecoderConfig struct{}

func (c DefaultDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
        return make(BytesPayload, n), nil
}

// ValueConfig always returns the given Payload when MakePayload is called.
type ValueConfig struct {
        Payload Payload
}

func (c *ValueConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
        return c.Payload, nil
}

// DecodeOneMessage decodes one message from r. config provides specifics on
// how to decode messages, nil indicates that the DefaultDecoderConfig should
// be used.
func DecodeOneMessage(r io.Reader, config DecoderConfig) (msg Message, err error) {
        var hdr Header
        var msgType MessageType
        var packetRemaining int32
        msgType, packetRemaining, err = hdr.Decode(r)
        if err != nil {
                return
        }

        msg, err = NewMessage(msgType)
        if err != nil {
                return
        }

        if config == nil {
                config = DefaultDecoderConfig{}
        }

        return msg, msg.Decode(r, hdr, packetRemaining, config)
}

// NewMessage creates an instance of a Message value for the given message
// type. An error is returned if msgType is invalid.
func NewMessage(msgType MessageType) (msg Message, err error) {
        switch msgType {
        case MsgConnect:
                msg = new(Connect)
        case MsgConnAck:
                msg = new(ConnAck)
        case MsgPublish:
                msg = new(Publish)
        case MsgPubAck:
                msg = new(PubAck)
        case MsgPubRec:
                msg = new(PubRec)
        case MsgPubRel:
                msg = new(PubRel)
        case MsgPubComp:
                msg = new(PubComp)
        case MsgSubscribe:
                msg = new(Subscribe)
        case MsgUnsubAck:
                msg = new(UnsubAck)
        case MsgSubAck:
                msg = new(SubAck)
        case MsgUnsubscribe:
                msg = new(Unsubscribe)
        case MsgPingReq:
                msg = new(PingReq)
        case MsgPingResp:
                msg = new(PingResp)
        case MsgDisconnect:
                msg = new(Disconnect)
        default:
                return nil, badMsgTypeError
        }

        return
}

// panicErr wraps an error that caused a problem that needs to bail out of the
// API, such that errors can be recovered and returned as errors from the
// public API.
type panicErr struct {
        err error
}

func (p panicErr) Error() string {
        return p.err.Error()
}

func raiseError(err error) {
        panic(panicErr{err})
}

// recoverError recovers any panic in flight and, iff it's an error from
// raiseError, will return the error. Otherwise re-raises the panic value.
// If no panic is in flight, it returns existingErr.
//
// This must be used in combination with a defer in all public API entry
// points where raiseError could be called.
func recoverError(existingErr error, recovered interface{}) error {
        if recovered != nil {
                if pErr, ok := recovered.(panicErr); ok {
                        return pErr.err
                } else {
                        panic(recovered)
                }
        }
        return existingErr
}

使用特权

评论回复
12
keer_zu|  楼主 | 2017-3-8 08:40 | 只看该作者
回头再看看  mqtt_test.go ,就是对所有mqtt消息的decode和encode进行测试,调用mqtt.go中的API

至此这个github托管小项目就很清楚了。

使用特权

评论回复
13
keer_zu|  楼主 | 2017-3-8 08:40 | 只看该作者
yyy71cj 发表于 2017-3-8 08:06
全是代码,不知道爱代码的人感觉如何

肯定喜欢得不要不要的

使用特权

评论回复
14
keer_zu|  楼主 | 2017-3-8 13:19 | 只看该作者

这里,代码控还是很多的

使用特权

评论回复
15
keer_zu|  楼主 | 2017-3-8 15:17 | 只看该作者
yyy71cj 发表于 2017-3-8 15:13
说不定有人会喜欢得……要啊……要啊……的

余兄,此情此景求分享一篇原创小黄文。

使用特权

评论回复
16
keer_zu|  楼主 | 2017-3-8 15:56 | 只看该作者
yyy71cj 发表于 2017-3-8 15:56
想写的,就是不能当饭吃,我要先混口饭吃呢……

这个比写书挣钱

使用特权

评论回复
17
zhuyemm| | 2017-3-8 18:56 | 只看该作者
淡定,淡定,淡定……

使用特权

评论回复
18
keer_zu|  楼主 | 2017-6-12 22:13 | 只看该作者
yyy71cj 发表于 2017-3-8 16:01
是么?我得考虑考虑……

mosquitto

使用特权

评论回复
19
keer_zu|  楼主 | 2017-6-14 07:08 | 只看该作者

这个做mqtt服务器确实不错。

使用特权

评论回复
20
keer_zu|  楼主 | 2017-6-14 12:59 | 只看该作者
yyy71cj 发表于 2017-6-14 10:14
还没涉足这个领域,所以你突然说个不错,我不知道从哪里看过去…… ...

用过了才知道。

使用特权

评论回复
发新帖 我要提问
您需要登录后才可以回帖 登录 | 注册

本版积分规则

1349

主题

12426

帖子

53

粉丝