github.com/huin/mqtt项目解析

[复制链接]
 楼主| keer_zu 发表于 2017-3-7 14:38 | 显示全部楼层 |阅读模式
本帖最后由 keer_zu 于 2017-3-7 15:05 编辑

huin/mqtt

在分析:

一起看看这个go语言实现的mqtt
这个项目的时候,引用到了上面的/huin/mqtt。这里尝试做个分析。

@21ic小喇叭 要连载了,给个推荐啊
@dong_abc @yyy71cj @ddllxxrr @ningling_21 @guojihongwhpu1
给我给点建议。
 楼主| keer_zu 发表于 2017-3-7 14:41 | 显示全部楼层
mqtt

An MQTT encoder and decoder,written in Golang.

This library was modified heavily from https://github.com/plucury/mqtt.go and is API-incompatible with it.

Currently the library's API is unstable.

从上面介绍可知,主要是一个mqtt的编解码器。
 楼主| keer_zu 发表于 2017-3-7 14:43 | 显示全部楼层
先看它的TEST : mqtt_test.go

  1. package mqtt

  2. import (
  3.         "bytes"
  4.         "io"
  5.         "reflect"
  6.         "testing"

  7.         gbt "github.com/huin/gobinarytest"
  8. )

  9. type fakeSizePayload int

  10. func (p fakeSizePayload) Size() int {
  11.         return int(p)
  12. }

  13. func (p fakeSizePayload) WritePayload(w io.Writer) error {
  14.         return nil
  15. }

  16. func (p fakeSizePayload) ReadPayload(r io.Reader) error {
  17.         return nil
  18. }

  19. type fakeDecoderConfig struct{}

  20. func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
  21.         return fakeSizePayload(n), nil
  22. }

  23. func TestEncodeDecode(t *testing.T) {
  24.         tests := []struct {
  25.                 Comment       string
  26.                 DecoderConfig DecoderConfig
  27.                 Msg           Message
  28.                 Expected      gbt.Matcher
  29.                 NoEncodeTest  bool
  30.         }{
  31.                 {
  32.                         Comment: "CONNECT message",
  33.                         Msg: &Connect{
  34.                                 ProtocolName:    "MQIsdp",
  35.                                 ProtocolVersion: 3,
  36.                                 UsernameFlag:    true,
  37.                                 PasswordFlag:    true,
  38.                                 WillRetain:      false,
  39.                                 WillQos:         1,
  40.                                 WillFlag:        true,
  41.                                 CleanSession:    true,
  42.                                 KeepAliveTimer:  10,
  43.                                 ClientId:        "xixihaha",
  44.                                 WillTopic:       "topic",
  45.                                 WillMessage:     "message",
  46.                                 Username:        "name",
  47.                                 Password:        "pwd",
  48.                         },
  49.                         Expected: gbt.InOrder{
  50.                                 gbt.Named{"Header byte", gbt.Literal{0x10}},
  51.                                 gbt.Named{"Remaining length", gbt.Literal{12 + 5*2 + 8 + 5 + 7 + 4 + 3}},

  52.                                 // Extended headers for CONNECT:
  53.                                 gbt.Named{"Protocol name", gbt.InOrder{gbt.Literal{0x00, 0x06}, gbt.Literal("MQIsdp")}},
  54.                                 gbt.Named{
  55.                                         "Extended headers for CONNECT",
  56.                                         gbt.Literal{
  57.                                                 0x03,       // Protocol version number
  58.                                                 0xce,       // Connect flags
  59.                                                 0x00, 0x0a, // Keep alive timer
  60.                                         },
  61.                                 },

  62.                                 // CONNECT payload:
  63.                                 gbt.Named{"Client identifier", gbt.InOrder{gbt.Literal{0x00, 0x08}, gbt.Literal("xixihaha")}},
  64.                                 gbt.Named{"Will topic", gbt.InOrder{gbt.Literal{0x00, 0x05}, gbt.Literal("topic")}},
  65.                                 gbt.Named{"Will message", gbt.InOrder{gbt.Literal{0x00, 0x07}, gbt.Literal("message")}},
  66.                                 gbt.Named{"Username", gbt.InOrder{gbt.Literal{0x00, 0x04}, gbt.Literal("name")}},
  67.                                 gbt.Named{"Password", gbt.InOrder{gbt.Literal{0x00, 0x03}, gbt.Literal("pwd")}},
  68.                         },
  69.                 },

  70.                 {
  71.                         Comment: "CONNACK message",
  72.                         Msg: &ConnAck{
  73.                                 ReturnCode: RetCodeBadUsernameOrPassword,
  74.                         },
  75.                         Expected: gbt.InOrder{
  76.                                 gbt.Named{"Header byte", gbt.Literal{0x20}},
  77.                                 gbt.Named{"Remaining length", gbt.Literal{2}},

  78.                                 gbt.Named{"Reserved byte", gbt.Literal{0}},
  79.                                 gbt.Named{"Return code", gbt.Literal{4}},
  80.                         },
  81.                 },

  82.                 {
  83.                         Comment: "PUBLISH message with QoS = QosAtMostOnce",
  84.                         Msg: &Publish{
  85.                                 Header: Header{
  86.                                         DupFlag:  false,
  87.                                         QosLevel: QosAtMostOnce,
  88.                                         Retain:   false,
  89.                                 },
  90.                                 TopicName: "a/b",
  91.                                 Payload:   BytesPayload{1, 2, 3},
  92.                         },
  93.                         Expected: gbt.InOrder{
  94.                                 gbt.Named{"Header byte", gbt.Literal{0x30}},
  95.                                 gbt.Named{"Remaining length", gbt.Literal{5 + 3}},

  96.                                 gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
  97.                                 // No MessageId should be present.
  98.                                 gbt.Named{"Data", gbt.Literal{1, 2, 3}},
  99.                         },
  100.                 },

  101.                 {
  102.                         Comment: "PUBLISH message with QoS = QosAtLeastOnce",
  103.                         Msg: &Publish{
  104.                                 Header: Header{
  105.                                         DupFlag:  true,
  106.                                         QosLevel: QosAtLeastOnce,
  107.                                         Retain:   false,
  108.                                 },
  109.                                 TopicName: "a/b",
  110.                                 MessageId: 0x1234,
  111.                                 Payload:   BytesPayload{1, 2, 3},
  112.                         },
  113.                         Expected: gbt.InOrder{
  114.                                 gbt.Named{"Header byte", gbt.Literal{0x3a}},
  115.                                 gbt.Named{"Remaining length", gbt.Literal{7 + 3}},

  116.                                 gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
  117.                                 gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
  118.                                 gbt.Named{"Data", gbt.Literal{1, 2, 3}},
  119.                         },
  120.                 },

  121.                 {
  122.                         Comment:       "PUBLISH message with maximum size payload",
  123.                         DecoderConfig: fakeDecoderConfig{},
  124.                         Msg: &Publish{
  125.                                 Header: Header{
  126.                                         DupFlag:  false,
  127.                                         QosLevel: QosAtMostOnce,
  128.                                         Retain:   false,
  129.                                 },
  130.                                 TopicName: "a/b",
  131.                                 Payload:   fakeSizePayload(MaxPayloadSize - 5),
  132.                         },
  133.                         Expected: gbt.InOrder{
  134.                                 gbt.Named{"Header byte", gbt.Literal{0x30}},
  135.                                 gbt.Named{"Remaining length", gbt.Literal{0xff, 0xff, 0xff, 0x7f}},

  136.                                 gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
  137.                                 // Our fake payload doesn't write any data, so no data should appear here.
  138.                         },
  139.                 },

  140.                 {
  141.                         Comment: "PUBACK message",
  142.                         Msg:     &PubAck{MessageId: 0x1234},
  143.                         Expected: gbt.InOrder{
  144.                                 gbt.Named{"Header byte", gbt.Literal{0x40}},
  145.                                 gbt.Named{"Remaining length", gbt.Literal{2}},
  146.                                 gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
  147.                         },
  148.                 },

  149.                 {
  150.                         Comment: "PUBREC message",
  151.                         Msg:     &PubRec{MessageId: 0x1234},
  152.                         Expected: gbt.InOrder{
  153.                                 gbt.Named{"Header byte", gbt.Literal{0x50}},
  154.                                 gbt.Named{"Remaining length", gbt.Literal{2}},
  155.                                 gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
  156.                         },
  157.                 },

  158.                 {
  159.                         Comment: "PUBREL message",
  160.                         Msg:     &PubRel{MessageId: 0x1234},
  161.                         Expected: gbt.InOrder{
  162.                                 gbt.Named{"Header byte", gbt.Literal{0x60}},
  163.                                 gbt.Named{"Remaining length", gbt.Literal{2}},
  164.                                 gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
  165.                         },
  166.                 },

  167.                 {
  168.                         Comment: "PUBCOMP message",
  169.                         Msg:     &PubComp{MessageId: 0x1234},
  170.                         Expected: gbt.InOrder{
  171.                                 gbt.Named{"Header byte", gbt.Literal{0x70}},
  172.                                 gbt.Named{"Remaining length", gbt.Literal{2}},
  173.                                 gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
  174.                         },
  175.                 },

  176.                 {
  177.                         Comment: "SUBSCRIBE message",
  178.                         Msg: &Subscribe{
  179.                                 Header: Header{
  180.                                         DupFlag:  false,
  181.                                         QosLevel: QosAtLeastOnce,
  182.                                 },
  183.                                 MessageId: 0x4321,
  184.                                 Topics: []TopicQos{
  185.                                         {"a/b", QosAtLeastOnce},
  186.                                         {"c/d", QosExactlyOnce},
  187.                                 },
  188.                         },
  189.                         Expected: gbt.InOrder{
  190.                                 gbt.Named{"Header byte", gbt.Literal{0x82}},
  191.                                 gbt.Named{"Remaining length", gbt.Literal{2 + 5 + 1 + 5 + 1}},

  192.                                 gbt.Named{"MessageId", gbt.Literal{0x43, 0x21}},
  193.                                 gbt.Named{"First topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
  194.                                 gbt.Named{"First topic QoS", gbt.Literal{1}},
  195.                                 gbt.Named{"Second topic", gbt.Literal{0x00, 0x03, 'c', '/', 'd'}},
  196.                                 gbt.Named{"Second topic QoS", gbt.Literal{2}},
  197.                         },
  198.                 },

  199.                 {
  200.                         Comment: "SUBACK message",
  201.                         Msg: &SubAck{
  202.                                 MessageId: 0x1234,
  203.                                 TopicsQos: []QosLevel{QosAtMostOnce, QosExactlyOnce},
  204.                         },
  205.                         Expected: gbt.InOrder{
  206.                                 gbt.Named{"Header byte", gbt.Literal{0x90}},
  207.                                 gbt.Named{"Remaining length", gbt.Literal{4}},
  208.                                 gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
  209.                                 gbt.Named{"TopicsQos", gbt.Literal{0x00, 0x02}},
  210.                         },
  211.                 },

  212.                 {
  213.                         Comment: "UNSUBSCRIBE message",
  214.                         Msg: &Unsubscribe{
  215.                                 Header: Header{
  216.                                         DupFlag:  false,
  217.                                         QosLevel: QosAtLeastOnce,
  218.                                 },
  219.                                 MessageId: 0x4321,
  220.                                 Topics:    []string{"a/b", "c/d"},
  221.                         },
  222.                         Expected: gbt.InOrder{
  223.                                 gbt.Named{"Header byte", gbt.Literal{0xa2}},
  224.                                 gbt.Named{"Remaining length", gbt.Literal{2 + 5 + 5}},

  225.                                 gbt.Named{"MessageId", gbt.Literal{0x43, 0x21}},
  226.                                 gbt.Named{"First topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
  227.                                 gbt.Named{"Second topic", gbt.Literal{0x00, 0x03, 'c', '/', 'd'}},
  228.                         },
  229.                 },

  230.                 {
  231.                         Comment: "UNSUBACK message",
  232.                         Msg:     &UnsubAck{MessageId: 0x1234},
  233.                         Expected: gbt.InOrder{
  234.                                 gbt.Named{"Header byte", gbt.Literal{0xb0}},
  235.                                 gbt.Named{"Remaining length", gbt.Literal{2}},
  236.                                 gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
  237.                         },
  238.                 },

  239.                 {
  240.                         Comment: "PINGREQ message",
  241.                         Msg:     &PingReq{},
  242.                         Expected: gbt.InOrder{
  243.                                 gbt.Named{"Header byte", gbt.Literal{0xc0}},
  244.                                 gbt.Named{"Remaining length", gbt.Literal{0}},
  245.                         },
  246.                 },

  247.                 {
  248.                         Comment: "PINGRESP message",
  249.                         Msg:     &PingResp{},
  250.                         Expected: gbt.InOrder{
  251.                                 gbt.Named{"Header byte", gbt.Literal{0xd0}},
  252.                                 gbt.Named{"Remaining length", gbt.Literal{0}},
  253.                         },
  254.                 },

  255.                 {
  256.                         Comment: "DISCONNECT message",
  257.                         Msg:     &Disconnect{},
  258.                         Expected: gbt.InOrder{
  259.                                 gbt.Named{"Header byte", gbt.Literal{0xe0}},
  260.                                 gbt.Named{"Remaining length", gbt.Literal{0}},
  261.                         },
  262.                 },
  263.         }

  264.         for _, test := range tests {
  265.                 {
  266.                         // Test decoding.
  267.                         expectedBuf := new(bytes.Buffer)
  268.                         test.Expected.Write(expectedBuf)

  269.                         if decodedMsg, err := DecodeOneMessage(expectedBuf, test.DecoderConfig); err != nil {
  270.                                 t.Errorf("%s: Unexpected error during decoding: %v", test.Comment, err)
  271.                         } else if !reflect.DeepEqual(test.Msg, decodedMsg) {
  272.                                 t.Errorf("%s: Decoded value mismatch\n     got = %#v\nexpected = %#v",
  273.                                         test.Comment, decodedMsg, test.Msg)
  274.                         }
  275.                 }

  276.                 if !test.NoEncodeTest {
  277.                         // Test encoding.
  278.                         encodedBuf := new(bytes.Buffer)
  279.                         if err := test.Msg.Encode(encodedBuf); err != nil {
  280.                                 t.Errorf("%s: Unexpected error during encoding: %v", test.Comment, err)
  281.                         } else if err = gbt.Matches(test.Expected, encodedBuf.Bytes()); err != nil {
  282.                                 t.Errorf("%s: Unexpected encoding output: %v", test.Comment, err)
  283.                         }
  284.                 }
  285.         }
  286. }

  287. func TestErrorEncode(t *testing.T) {
  288.         tests := []struct {
  289.                 Comment string
  290.                 Msg     Message
  291.         }{
  292.                 {
  293.                         Comment: "Payload reports Size() that's too large for MQTT payload.",
  294.                         Msg: &Publish{
  295.                                 TopicName: "big/message",
  296.                                 MessageId: 0x1234,
  297.                                 // MaxPayloadSize-5 is too large - the payload space is further
  298.                                 // restricted by the variable header.
  299.                                 Payload: fakeSizePayload(MaxPayloadSize),
  300.                         },
  301.                 },
  302.                 {
  303.                         Comment: "Payload reports Size() that would overflow when added to variable header size.",
  304.                         Msg: &Publish{
  305.                                 TopicName: "big/message",
  306.                                 MessageId: 0x1234,
  307.                                 Payload:   fakeSizePayload(0x7fffffff),
  308.                         },
  309.                 },
  310.         }

  311.         for _, test := range tests {
  312.                 encodedBuf := new(bytes.Buffer)
  313.                 if err := test.Msg.Encode(encodedBuf); err == nil {
  314.                         t.Errorf("%s: Expected error during encoding, but got nil.", test.Comment)
  315.                 }
  316.         }
  317. }

  318. func TestErrorDecode(t *testing.T) {
  319.         tests := []struct {
  320.                 Comment  string
  321.                 Expected gbt.Matcher
  322.         }{
  323.                 {
  324.                         Comment:  "Immediate EOF",
  325.                         Expected: gbt.Literal{},
  326.                 },
  327.                 {
  328.                         Comment:  "EOF at 1 byte",
  329.                         Expected: gbt.Literal{0x10},
  330.                 },
  331.                 {
  332.                         Comment: "PUBACK message with too short a length",
  333.                         Expected: gbt.InOrder{
  334.                                 gbt.Named{"Header byte", gbt.Literal{0x40}},
  335.                                 gbt.Named{"Remaining length", gbt.Literal{1}},

  336.                                 gbt.Named{"Truncated MessageId", gbt.Literal{0x12}},
  337.                         },
  338.                 },
  339.                 {
  340.                         Comment: "PUBACK message with too long a length",
  341.                         Expected: gbt.InOrder{
  342.                                 gbt.Named{"Header byte", gbt.Literal{0x40}},
  343.                                 gbt.Named{"Remaining length", gbt.Literal{3}},

  344.                                 gbt.Named{"Truncated MessageId", gbt.Literal{0x12, 0x34, 0x56}},
  345.                         },
  346.                 },
  347.         }

  348.         for _, test := range tests {
  349.                 expectedBuf := new(bytes.Buffer)
  350.                 test.Expected.Write(expectedBuf)

  351.                 if _, err := DecodeOneMessage(expectedBuf, nil); err == nil {
  352.                         t.Errorf("%s: Expected error during decoding, but got nil.", test.Comment)
  353.                 }
  354.         }
  355. }

  356. func TestLengthEncodeDecode(t *testing.T) {
  357.         tests := []struct {
  358.                 Value   int32
  359.                 Encoded gbt.Matcher
  360.         }{
  361.                 {0, gbt.Literal{0}},
  362.                 {1, gbt.Literal{1}},
  363.                 {20, gbt.Literal{20}},

  364.                 // Boundary conditions used as tests taken from MQTT 3.1 spec.
  365.                 {0, gbt.Literal{0x00}},
  366.                 {127, gbt.Literal{0x7F}},
  367.                 {128, gbt.Literal{0x80, 0x01}},
  368.                 {16383, gbt.Literal{0xFF, 0x7F}},
  369.                 {16384, gbt.Literal{0x80, 0x80, 0x01}},
  370.                 {2097151, gbt.Literal{0xFF, 0xFF, 0x7F}},
  371.                 {2097152, gbt.Literal{0x80, 0x80, 0x80, 0x01}},
  372.                 {268435455, gbt.Literal{0xFF, 0xFF, 0xFF, 0x7F}},
  373.         }

  374.         for _, test := range tests {
  375.                 {
  376.                         // Test decoding.
  377.                         buf := new(bytes.Buffer)
  378.                         test.Encoded.Write(buf)
  379.                         buf = bytes.NewBuffer(buf.Bytes())
  380.                         if result := decodeLength(buf); test.Value != result {
  381.                                 t.Errorf("Decoding test %#x: got %#x", test.Value, result)
  382.                         }
  383.                 }
  384.                 {
  385.                         // Test encoding.
  386.                         buf := new(bytes.Buffer)
  387.                         encodeLength(test.Value, buf)
  388.                         if err := gbt.Matches(test.Encoded, buf.Bytes()); err != nil {
  389.                                 t.Errorf("Encoding test %#x: %v", test.Value, err)
  390.                         }
  391.                 }
  392.         }
  393. }

  394. type SeqBytePayload struct {
  395.         N int
  396.         T *testing.T
  397. }

  398. func (p *SeqBytePayload) Size() int {
  399.         return p.N
  400. }

  401. func (p *SeqBytePayload) WritePayload(w io.Writer) error {
  402.         buf := make([]byte, 256)
  403.         for i := range buf {
  404.                 buf[i] = byte(i)
  405.         }

  406.         for toWrite := p.N; toWrite > 0; toWrite -= 256 {
  407.                 writeThisTime := toWrite
  408.                 if writeThisTime > 256 {
  409.                         writeThisTime = 256
  410.                 }
  411.                 if _, err := w.Write(buf[:writeThisTime]); err != nil {
  412.                         return err
  413.                 }
  414.         }
  415.         return nil
  416. }

  417. func (p *SeqBytePayload) ReadPayload(r io.Reader) error {
  418.         buf := make([]byte, 256)

  419.         for toRead := p.N; toRead > 0; toRead -= 256 {
  420.                 readThisTime := toRead
  421.                 if readThisTime > 256 {
  422.                         readThisTime = 256
  423.                 }
  424.                 if _, err := io.ReadFull(r, buf[:readThisTime]); err != nil {
  425.                         p.T.Errorf("Got unexpected error %v", err)
  426.                         return err
  427.                 }
  428.                 for i, v := range buf[:readThisTime] {
  429.                         if v != byte(i) {
  430.                                 p.T.Errorf("Got unexpected byte %#x, expected %#x", v, i)
  431.                                 return io.ErrClosedPipe
  432.                         }
  433.                 }
  434.         }

  435.         // Any more reads should produce an EOF.
  436.         if n, err := r.Read(buf); n > 0 {
  437.                 p.T.Errorf("Got unexpected extra %d bytes of data", n)
  438.         } else if err != io.EOF {
  439.                 p.T.Errorf("Expected err=EOF, got %v", err)
  440.         }

  441.         return nil
  442. }

  443. // Stream a reasonably large Publish payload containing sequential byte values.
  444. func TestPipedPublish(t *testing.T) {
  445.         r, w := io.Pipe()

  446.         complete := make(chan bool)

  447.         payload := &SeqBytePayload{
  448.                 N: 20 + (1 << 20),
  449.                 T: t,
  450.         }

  451.         go func() {
  452.                 defer func() {
  453.                         complete <- true
  454.                         w.Close()
  455.                 }()

  456.                 msg := &Publish{
  457.                         TopicName: "foo",
  458.                         Payload:   payload,
  459.                 }
  460.                 if err := msg.Encode(w); err != nil {
  461.                         t.Error(err)
  462.                 }
  463.         }()

  464.         go func() {
  465.                 defer func() {
  466.                         complete <- true
  467.                         r.Close()
  468.                 }()

  469.                 expectedMsg := &Publish{
  470.                         TopicName: "foo",
  471.                         Payload:   payload,
  472.                 }

  473.                 testConfig := &ValueConfig{payload}

  474.                 if msg, err := DecodeOneMessage(r, testConfig); err != nil {
  475.                         t.Error(err)
  476.                 } else if !reflect.DeepEqual(expectedMsg, msg) {
  477.                         t.Errorf("     got = %#v\nexpected = %#v", msg, expectedMsg)
  478.                 }
  479.         }()

  480.         _ = <-complete
  481.         _ = <-complete
  482. }

 楼主| keer_zu 发表于 2017-3-7 15:14 | 显示全部楼层
本帖最后由 keer_zu 于 2017-3-7 15:16 编辑
keer_zu 发表于 2017-3-7 14:43
先看它的TEST : mqtt_test.go

第28行:func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)

这里涉及到一个类型 Publish,下面就先来分析一下这个类型:

// Publish represents an MQTT PUBLISH message.

type Publish struct {
        Header
        TopicName string
        MessageId uint16
        Payload   Payload
}





上面是头。

3.3.2 可变报头可变报头按顺序包含主题名和报文标识符。
主题名 Topic Name主题名(Topic Name)用于识别有效载荷数据应该被发布到哪一个信息通道。
主题名必须是PUBLISH报文可变报头的第一个字段。它必须是 1.5.3节定义的UTF-8编码的字符串 [MQTT-3.3.2-1]。
PUBLISH报文中的主题名不能包含通配符 [MQTT-3.3.2-2]。
服务端发送给订阅客户端的PUBLISH报文的主题名必须匹配该订阅的主题过滤器(根据 4.7节定义的匹配过程)[MQTT-3.3.2-3]。
报文标识符 Packet Identifier只有当QoS等级是1或2时,报文标识符(Packet Identifier)字段才能出现在PUBLISH报文中。2.3.1节提供了有关报文标识符的更多信息。
可变报头非规范示例图例 3.11 – PUBLISH报文可变报头非规范示例 举例说明了 表格 3.3 - PUBLISH报文非规范示例 中简要描述的PUBLISH报文的可变报头。
表格 3.3 - PUBLISH报文非规范示例[td]
FieldValue
主题名a/b
报文标识符10





本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?注册

×
 楼主| keer_zu 发表于 2017-3-7 15:18 | 显示全部楼层
keer_zu 发表于 2017-3-7 15:14
第28行:func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)

...

有效载荷

有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度。包含零长度有效载荷的PUBLISH报文是合法的。

以上就是publish的三部分组成。

  1. func (msg *Publish) Encode(w io.Writer) (err error) {
  2.         buf := new(bytes.Buffer)

  3.         setString(msg.TopicName, buf)
  4.         if msg.Header.QosLevel.HasId() {
  5.                 setUint16(msg.MessageId, buf)
  6.         }

  7.         if err = writeMessage(w, MsgPublish, &msg.Header, buf, int32(msg.Payload.Size())); err != nil {
  8.                 return
  9.         }

  10.         return msg.Payload.WritePayload(w)
  11. }

  12. func (msg *Publish) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error) {
  13.         defer func() {
  14.                 err = recoverError(err, recover())
  15.         }()

  16.         msg.Header = hdr

  17.         msg.TopicName = getString(r, &packetRemaining)
  18.         if msg.Header.QosLevel.HasId() {
  19.                 msg.MessageId = getUint16(r, &packetRemaining)
  20.         }

  21.         payloadReader := &io.LimitedReader{r, int64(packetRemaining)}

  22.         if msg.Payload, err = config.MakePayload(msg, payloadReader, int(packetRemaining)); err != nil {
  23.                 return
  24.         }

  25.         return msg.Payload.ReadPayload(payloadReader)
  26. }


该类型支持这两者方法,分别是编码和解码。
 楼主| keer_zu 发表于 2017-3-7 16:37 | 显示全部楼层
yyy71cj 发表于 2017-3-7 15:18
本想给你加个精的,一看是他人板块……

 楼主| keer_zu 发表于 2017-3-7 17:10 | 显示全部楼层
 楼主| keer_zu 发表于 2017-3-7 21:02 | 显示全部楼层
可以看到整个message.go就是对:


    3.0 Contents – MQTT控制报文
    3.1 CONNECT – 连接服务端
    3.2 CONNACK – 确认连接请求
    3.3 PUBLISH – 发布消息
    3.4 PUBACK –发布确认
    3.5 PUBREC – 发布收到(QoS 2,第一步)
    3.6 PUBREL – 发布释放(QoS 2,第二步)
    3.7 PUBCOMP – 发布完成(QoS 2,第三步)
    3.8 SUBSCRIBE - 订阅主题
    3.9 SUBACK – 订阅确认
    3.10 UNSUBSCRIBE –取消订阅
    3.11 UNSUBACK – 取消订阅确认
    3.12 PINGREQ – 心跳请求
    3.13 PINGRESP – 心跳响应
    3.14 DISCONNECT –断开连接

这些消息的编解码。
 楼主| keer_zu 发表于 2017-3-7 21:04 | 显示全部楼层
encodeing.go 只是最基本的运算和转换


  1. package mqtt

  2. import (
  3.         "bytes"
  4.         "io"
  5. )

  6. func getUint8(r io.Reader, packetRemaining *int32) uint8 {
  7.         if *packetRemaining < 1 {
  8.                 raiseError(dataExceedsPacketError)
  9.         }

  10.         var b [1]byte
  11.         if _, err := io.ReadFull(r, b[:]); err != nil {
  12.                 raiseError(err)
  13.         }
  14.         *packetRemaining--

  15.         return b[0]
  16. }

  17. func getUint16(r io.Reader, packetRemaining *int32) uint16 {
  18.         if *packetRemaining < 2 {
  19.                 raiseError(dataExceedsPacketError)
  20.         }

  21.         var b [2]byte
  22.         if _, err := io.ReadFull(r, b[:]); err != nil {
  23.                 raiseError(err)
  24.         }
  25.         *packetRemaining -= 2

  26.         return uint16(b[0])<<8 | uint16(b[1])
  27. }

  28. func getString(r io.Reader, packetRemaining *int32) string {
  29.         strLen := int(getUint16(r, packetRemaining))

  30.         if int(*packetRemaining) < strLen {
  31.                 raiseError(dataExceedsPacketError)
  32.         }

  33.         b := make([]byte, strLen)
  34.         if _, err := io.ReadFull(r, b); err != nil {
  35.                 raiseError(err)
  36.         }
  37.         *packetRemaining -= int32(strLen)

  38.         return string(b)
  39. }

  40. func setUint8(val uint8, buf *bytes.Buffer) {
  41.         buf.WriteByte(byte(val))
  42. }

  43. func setUint16(val uint16, buf *bytes.Buffer) {
  44.         buf.WriteByte(byte(val & 0xff00 >> 8))
  45.         buf.WriteByte(byte(val & 0x00ff))
  46. }

  47. func setString(val string, buf *bytes.Buffer) {
  48.         length := uint16(len(val))
  49.         setUint16(length, buf)
  50.         buf.WriteString(val)
  51. }

  52. func boolToByte(val bool) byte {
  53.         if val {
  54.                 return byte(1)
  55.         }
  56.         return byte(0)
  57. }

  58. func decodeLength(r io.Reader) int32 {
  59.         var v int32
  60.         var buf [1]byte
  61.         var shift uint
  62.         for i := 0; i < 4; i++ {
  63.                 if _, err := io.ReadFull(r, buf[:]); err != nil {
  64.                         raiseError(err)
  65.                 }

  66.                 b := buf[0]
  67.                 v |= int32(b&0x7f) << shift

  68.                 if b&0x80 == 0 {
  69.                         return v
  70.                 }
  71.                 shift += 7
  72.         }

  73.         raiseError(badLengthEncodingError)
  74.         panic("unreachable")
  75. }

  76. func encodeLength(length int32, buf *bytes.Buffer) {
  77.         if length == 0 {
  78.                 buf.WriteByte(0)
  79.                 return
  80.         }
  81.         for length > 0 {
  82.                 digit := length & 0x7f
  83.                 length = length >> 7
  84.                 if length > 0 {
  85.                         digit = digit | 0x80
  86.                 }
  87.                 buf.WriteByte(byte(digit))
  88.         }
  89. }




 楼主| keer_zu 发表于 2017-3-7 21:11 | 显示全部楼层
payload.go 很短,定义了Payload接口,同时实现了BytesPayload和StreamedPayload两种类型。

他们都可以以这个接口调用,具体对外的方法有:
Size()  ----- 返回Payload的长度。
WritePayload() ------ 写Payload
ReadPayload() ------ 读Payload

代码如下:

  1. package mqtt

  2. import (
  3.         "io"
  4. )

  5. // Payload is the interface for Publish payloads. Typically the BytesPayload
  6. // implementation will be sufficient for small payloads whose full contents
  7. // will exist in memory. However, other implementations can read or write
  8. // payloads requiring them holding their complete contents in memory.
  9. type Payload interface {
  10.         // Size returns the number of bytes that WritePayload will write.
  11.         Size() int

  12.         // WritePayload writes the payload data to w. Implementations must write
  13.         // Size() bytes of data, but it is *not* required to do so prior to
  14.         // returning. Size() bytes must have been written to w prior to another
  15.         // message being encoded to the underlying connection.
  16.         WritePayload(w io.Writer) error

  17.         // ReadPayload reads the payload data from r (r will EOF at the end of the
  18.         // payload). It is *not* required for r to have been consumed prior to this
  19.         // returning. r must have been consumed completely prior to another message
  20.         // being decoded from the underlying connection.
  21.         ReadPayload(r io.Reader) error
  22. }

  23. // BytesPayload reads/writes a plain slice of bytes.
  24. type BytesPayload []byte

  25. func (p BytesPayload) Size() int {
  26.         return len(p)
  27. }

  28. func (p BytesPayload) WritePayload(w io.Writer) error {
  29.         _, err := w.Write(p)
  30.         return err
  31. }

  32. func (p BytesPayload) ReadPayload(r io.Reader) error {
  33.         _, err := io.ReadFull(r, p)
  34.         return err
  35. }

  36. // StreamedPayload writes payload data from reader, or reads payload data into a writer.
  37. type StreamedPayload struct {
  38.         // N indicates payload size to the encoder. This many bytes will be read from
  39.         // the reader when encoding. The number of bytes in the payload will be
  40.         // stored here when decoding.
  41.         N int

  42.         // EncodingSource is used to copy data from when encoding a Publish message
  43.         // onto the wire. This can be
  44.         EncodingSource io.Reader

  45.         // DecodingSink is used to copy data to when decoding a Publish message from
  46.         // the wire. This can be nil if the payload is only being used for encoding.
  47.         DecodingSink io.Writer
  48. }

  49. func (p *StreamedPayload) Size() int {
  50.         return p.N
  51. }

  52. func (p *StreamedPayload) WritePayload(w io.Writer) error {
  53.         _, err := io.CopyN(w, p.EncodingSource, int64(p.N))
  54.         return err
  55. }

  56. func (p *StreamedPayload) ReadPayload(r io.Reader) error {
  57.         n, err := io.Copy(p.DecodingSink, r)
  58.         p.N = int(n)
  59.         return err
  60. }


 楼主| keer_zu 发表于 2017-3-7 22:03 | 显示全部楼层
再来看看mqtt.go
他是在messages.go和payload.go等基础上 的再封装,可以从前面注释和后面代码看出:
他主要提供了两个API:
1. func DecodeOneMessage(r io.Reader, config DecoderConfig) (msg Message, err error)
2. func NewMessage(msgType MessageType) (msg Message, err error)
分别用来解析一条mqtt消息和构建一条mqtt消息。至此,这个project就很明显了,它只提供mqtt消息的处理。
然后让我们再回到之前调用他的那个project: 一起看看这个go语言实现的mqtt

  1. // Implementation of MQTT V3.1 encoding and decoding.
  2. //
  3. // See http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
  4. // for the MQTT protocol specification. This package does not implement the
  5. // semantics of MQTT, but purely the encoding and decoding of its messages.
  6. //
  7. // Decoding Messages:
  8. //
  9. // Use the DecodeOneMessage function to read a Message from an io.Reader, it
  10. // will return a Message value. The function can be implemented using the public
  11. // API of this package if more control is required. For example:
  12. //
  13. //   for {
  14. //     msg, err := mqtt.DecodeOneMessage(conn, nil)
  15. //     if err != nil {
  16. //       // handle err
  17. //     }
  18. //     switch msg := msg.(type) {
  19. //     case *Connect:
  20. //       // ...
  21. //     case *Publish:
  22. //       // ...
  23. //       // etc.
  24. //     }
  25. //   }
  26. //
  27. // Encoding Messages:
  28. //
  29. // Create a message value, and use its Encode method to write it to an
  30. // io.Writer. For example:
  31. //
  32. //   someData := []byte{1, 2, 3}
  33. //   msg := &Publish{
  34. //     Header: {
  35. //       DupFlag: false,
  36. //       QosLevel: QosAtLeastOnce,
  37. //       Retain: false,
  38. //     },
  39. //     TopicName: "a/b",
  40. //     MessageId: 10,
  41. //     Payload: BytesPayload(someData),
  42. //   }
  43. //   if err := msg.Encode(conn); err != nil {
  44. //     // handle err
  45. //   }
  46. //
  47. // Advanced PUBLISH payload handling:
  48. //
  49. // The default behaviour for decoding PUBLISH payloads, and most common way to
  50. // supply payloads for encoding, is the BytesPayload, which is a []byte
  51. // derivative.
  52. //
  53. // More complex handling is possible by implementing the Payload interface,
  54. // which can be injected into DecodeOneMessage via the `config` parameter, or
  55. // into an outgoing Publish message via its Payload field.  Potential benefits
  56. // of this include:
  57. //
  58. // * Data can be (un)marshalled directly on a connection, without an unecessary
  59. // round-trip via bytes.Buffer.
  60. //
  61. // * Data can be streamed directly on readers/writers (e.g files, other
  62. // connections, pipes) without the requirement to buffer an entire message
  63. // payload in memory at once.
  64. //
  65. // The limitations of these streaming features are:
  66. //
  67. // * When encoding a payload, the encoded size of the payload must be known and
  68. // declared upfront.
  69. //
  70. // * The payload size (and PUBLISH variable header) can be no more than 256MiB
  71. // minus 1 byte. This is a specified limitation of MQTT v3.1 itself.
  72. package mqtt

  73. import (
  74.         "errors"
  75.         "io"
  76. )

  77. var (
  78.         badMsgTypeError        = errors.New("mqtt: message type is invalid")
  79.         badQosError            = errors.New("mqtt: QoS is invalid")
  80.         badWillQosError        = errors.New("mqtt: will QoS is invalid")
  81.         badLengthEncodingError = errors.New("mqtt: remaining length field exceeded maximum of 4 bytes")
  82.         badReturnCodeError     = errors.New("mqtt: is invalid")
  83.         dataExceedsPacketError = errors.New("mqtt: data exceeds packet length")
  84.         msgTooLongError        = errors.New("mqtt: message is too long")
  85. )

  86. const (
  87.         QosAtMostOnce = QosLevel(iota)
  88.         QosAtLeastOnce
  89.         QosExactlyOnce

  90.         qosFirstInvalid
  91. )

  92. type QosLevel uint8

  93. func (qos QosLevel) IsValid() bool {
  94.         return qos < qosFirstInvalid
  95. }

  96. func (qos QosLevel) HasId() bool {
  97.         return qos == QosAtLeastOnce || qos == QosExactlyOnce
  98. }

  99. const (
  100.         RetCodeAccepted = ReturnCode(iota)
  101.         RetCodeUnacceptableProtocolVersion
  102.         RetCodeIdentifierRejected
  103.         RetCodeServerUnavailable
  104.         RetCodeBadUsernameOrPassword
  105.         RetCodeNotAuthorized

  106.         retCodeFirstInvalid
  107. )

  108. type ReturnCode uint8

  109. func (rc ReturnCode) IsValid() bool {
  110.         return rc >= RetCodeAccepted && rc < retCodeFirstInvalid
  111. }

  112. // DecoderConfig provides configuration for decoding messages.
  113. type DecoderConfig interface {
  114.         // MakePayload returns a Payload for the given Publish message. r is a Reader
  115.         // that will read the payload data, and n is the number of bytes in the
  116.         // payload. The Payload.ReadPayload method is called on the returned payload
  117.         // by the decoding process.
  118.         MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)
  119. }

  120. type DefaultDecoderConfig struct{}

  121. func (c DefaultDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
  122.         return make(BytesPayload, n), nil
  123. }

  124. // ValueConfig always returns the given Payload when MakePayload is called.
  125. type ValueConfig struct {
  126.         Payload Payload
  127. }

  128. func (c *ValueConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
  129.         return c.Payload, nil
  130. }

  131. // DecodeOneMessage decodes one message from r. config provides specifics on
  132. // how to decode messages, nil indicates that the DefaultDecoderConfig should
  133. // be used.
  134. func DecodeOneMessage(r io.Reader, config DecoderConfig) (msg Message, err error) {
  135.         var hdr Header
  136.         var msgType MessageType
  137.         var packetRemaining int32
  138.         msgType, packetRemaining, err = hdr.Decode(r)
  139.         if err != nil {
  140.                 return
  141.         }

  142.         msg, err = NewMessage(msgType)
  143.         if err != nil {
  144.                 return
  145.         }

  146.         if config == nil {
  147.                 config = DefaultDecoderConfig{}
  148.         }

  149.         return msg, msg.Decode(r, hdr, packetRemaining, config)
  150. }

  151. // NewMessage creates an instance of a Message value for the given message
  152. // type. An error is returned if msgType is invalid.
  153. func NewMessage(msgType MessageType) (msg Message, err error) {
  154.         switch msgType {
  155.         case MsgConnect:
  156.                 msg = new(Connect)
  157.         case MsgConnAck:
  158.                 msg = new(ConnAck)
  159.         case MsgPublish:
  160.                 msg = new(Publish)
  161.         case MsgPubAck:
  162.                 msg = new(PubAck)
  163.         case MsgPubRec:
  164.                 msg = new(PubRec)
  165.         case MsgPubRel:
  166.                 msg = new(PubRel)
  167.         case MsgPubComp:
  168.                 msg = new(PubComp)
  169.         case MsgSubscribe:
  170.                 msg = new(Subscribe)
  171.         case MsgUnsubAck:
  172.                 msg = new(UnsubAck)
  173.         case MsgSubAck:
  174.                 msg = new(SubAck)
  175.         case MsgUnsubscribe:
  176.                 msg = new(Unsubscribe)
  177.         case MsgPingReq:
  178.                 msg = new(PingReq)
  179.         case MsgPingResp:
  180.                 msg = new(PingResp)
  181.         case MsgDisconnect:
  182.                 msg = new(Disconnect)
  183.         default:
  184.                 return nil, badMsgTypeError
  185.         }

  186.         return
  187. }

  188. // panicErr wraps an error that caused a problem that needs to bail out of the
  189. // API, such that errors can be recovered and returned as errors from the
  190. // public API.
  191. type panicErr struct {
  192.         err error
  193. }

  194. func (p panicErr) Error() string {
  195.         return p.err.Error()
  196. }

  197. func raiseError(err error) {
  198.         panic(panicErr{err})
  199. }

  200. // recoverError recovers any panic in flight and, iff it's an error from
  201. // raiseError, will return the error. Otherwise re-raises the panic value.
  202. // If no panic is in flight, it returns existingErr.
  203. //
  204. // This must be used in combination with a defer in all public API entry
  205. // points where raiseError could be called.
  206. func recoverError(existingErr error, recovered interface{}) error {
  207.         if recovered != nil {
  208.                 if pErr, ok := recovered.(panicErr); ok {
  209.                         return pErr.err
  210.                 } else {
  211.                         panic(recovered)
  212.                 }
  213.         }
  214.         return existingErr
  215. }
 楼主| keer_zu 发表于 2017-3-8 08:40 | 显示全部楼层
回头再看看  mqtt_test.go ,就是对所有mqtt消息的decode和encode进行测试,调用mqtt.go中的API

至此这个github托管小项目就很清楚了。
 楼主| keer_zu 发表于 2017-3-8 08:40 | 显示全部楼层
yyy71cj 发表于 2017-3-8 08:06
全是代码,不知道爱代码的人感觉如何

肯定喜欢得不要不要的
 楼主| keer_zu 发表于 2017-3-8 13:19 | 显示全部楼层

这里,代码控还是很多的
 楼主| keer_zu 发表于 2017-3-8 15:17 | 显示全部楼层
yyy71cj 发表于 2017-3-8 15:13
说不定有人会喜欢得……要啊……要啊……的

余兄,此情此景求分享一篇原创小黄文。
 楼主| keer_zu 发表于 2017-3-8 15:56 | 显示全部楼层
yyy71cj 发表于 2017-3-8 15:56
想写的,就是不能当饭吃,我要先混口饭吃呢……

这个比写书挣钱
zhuyemm 发表于 2017-3-8 18:56 | 显示全部楼层
淡定,淡定,淡定……
 楼主| keer_zu 发表于 2017-6-12 22:13 | 显示全部楼层
yyy71cj 发表于 2017-3-8 16:01
是么?我得考虑考虑……

mosquitto
 楼主| keer_zu 发表于 2017-6-14 07:08 | 显示全部楼层

这个做mqtt服务器确实不错。
 楼主| keer_zu 发表于 2017-6-14 12:59 | 显示全部楼层
yyy71cj 发表于 2017-6-14 10:14
还没涉足这个领域,所以你突然说个不错,我不知道从哪里看过去…… ...

用过了才知道。
您需要登录后才可以回帖 登录 | 注册

本版积分规则

1478

主题

12917

帖子

55

粉丝
快速回复 在线客服 返回列表 返回顶部

1478

主题

12917

帖子

55

粉丝
快速回复 在线客服 返回列表 返回顶部