github.com/huin/mqtt项目解析
本帖最后由 keer_zu 于 2017-3-7 15:05 编辑huin/mqtt
在分析:
一起看看这个go语言实现的mqtt
这个项目的时候,引用到了上面的/huin/mqtt。这里尝试做个分析。
@21ic小喇叭 要连载了,给个推荐啊{:titter:}
@dong_abc @yyy71cj @ddllxxrr @ningling_21 @guojihongwhpu1
给我给点建议。
mqtt
An MQTT encoder and decoder,written in Golang.
This library was modified heavily from https://github.com/plucury/mqtt.go and is API-incompatible with it.
Currently the library's API is unstable.
从上面介绍可知,主要是一个mqtt的编解码器。 先看它的TEST : mqtt_test.go
package mqtt
import (
"bytes"
"io"
"reflect"
"testing"
gbt "github.com/huin/gobinarytest"
)
type fakeSizePayload int
func (p fakeSizePayload) Size() int {
return int(p)
}
func (p fakeSizePayload) WritePayload(w io.Writer) error {
return nil
}
func (p fakeSizePayload) ReadPayload(r io.Reader) error {
return nil
}
type fakeDecoderConfig struct{}
func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
return fakeSizePayload(n), nil
}
func TestEncodeDecode(t *testing.T) {
tests := []struct {
Comment string
DecoderConfig DecoderConfig
Msg Message
Expected gbt.Matcher
NoEncodeTestbool
}{
{
Comment: "CONNECT message",
Msg: &Connect{
ProtocolName: "MQIsdp",
ProtocolVersion: 3,
UsernameFlag: true,
PasswordFlag: true,
WillRetain: false,
WillQos: 1,
WillFlag: true,
CleanSession: true,
KeepAliveTimer:10,
ClientId: "xixihaha",
WillTopic: "topic",
WillMessage: "message",
Username: "name",
Password: "pwd",
},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x10}},
gbt.Named{"Remaining length", gbt.Literal{12 + 5*2 + 8 + 5 + 7 + 4 + 3}},
// Extended headers for CONNECT:
gbt.Named{"Protocol name", gbt.InOrder{gbt.Literal{0x00, 0x06}, gbt.Literal("MQIsdp")}},
gbt.Named{
"Extended headers for CONNECT",
gbt.Literal{
0x03, // Protocol version number
0xce, // Connect flags
0x00, 0x0a, // Keep alive timer
},
},
// CONNECT payload:
gbt.Named{"Client identifier", gbt.InOrder{gbt.Literal{0x00, 0x08}, gbt.Literal("xixihaha")}},
gbt.Named{"Will topic", gbt.InOrder{gbt.Literal{0x00, 0x05}, gbt.Literal("topic")}},
gbt.Named{"Will message", gbt.InOrder{gbt.Literal{0x00, 0x07}, gbt.Literal("message")}},
gbt.Named{"Username", gbt.InOrder{gbt.Literal{0x00, 0x04}, gbt.Literal("name")}},
gbt.Named{"Password", gbt.InOrder{gbt.Literal{0x00, 0x03}, gbt.Literal("pwd")}},
},
},
{
Comment: "CONNACK message",
Msg: &ConnAck{
ReturnCode: RetCodeBadUsernameOrPassword,
},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x20}},
gbt.Named{"Remaining length", gbt.Literal{2}},
gbt.Named{"Reserved byte", gbt.Literal{0}},
gbt.Named{"Return code", gbt.Literal{4}},
},
},
{
Comment: "PUBLISH message with QoS = QosAtMostOnce",
Msg: &Publish{
Header: Header{
DupFlag:false,
QosLevel: QosAtMostOnce,
Retain: false,
},
TopicName: "a/b",
Payload: BytesPayload{1, 2, 3},
},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x30}},
gbt.Named{"Remaining length", gbt.Literal{5 + 3}},
gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
// No MessageId should be present.
gbt.Named{"Data", gbt.Literal{1, 2, 3}},
},
},
{
Comment: "PUBLISH message with QoS = QosAtLeastOnce",
Msg: &Publish{
Header: Header{
DupFlag:true,
QosLevel: QosAtLeastOnce,
Retain: false,
},
TopicName: "a/b",
MessageId: 0x1234,
Payload: BytesPayload{1, 2, 3},
},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x3a}},
gbt.Named{"Remaining length", gbt.Literal{7 + 3}},
gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
gbt.Named{"Data", gbt.Literal{1, 2, 3}},
},
},
{
Comment: "PUBLISH message with maximum size payload",
DecoderConfig: fakeDecoderConfig{},
Msg: &Publish{
Header: Header{
DupFlag:false,
QosLevel: QosAtMostOnce,
Retain: false,
},
TopicName: "a/b",
Payload: fakeSizePayload(MaxPayloadSize - 5),
},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x30}},
gbt.Named{"Remaining length", gbt.Literal{0xff, 0xff, 0xff, 0x7f}},
gbt.Named{"Topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
// Our fake payload doesn't write any data, so no data should appear here.
},
},
{
Comment: "PUBACK message",
Msg: &PubAck{MessageId: 0x1234},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x40}},
gbt.Named{"Remaining length", gbt.Literal{2}},
gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
},
},
{
Comment: "PUBREC message",
Msg: &PubRec{MessageId: 0x1234},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x50}},
gbt.Named{"Remaining length", gbt.Literal{2}},
gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
},
},
{
Comment: "PUBREL message",
Msg: &PubRel{MessageId: 0x1234},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x60}},
gbt.Named{"Remaining length", gbt.Literal{2}},
gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
},
},
{
Comment: "PUBCOMP message",
Msg: &PubComp{MessageId: 0x1234},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x70}},
gbt.Named{"Remaining length", gbt.Literal{2}},
gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
},
},
{
Comment: "SUBSCRIBE message",
Msg: &Subscribe{
Header: Header{
DupFlag:false,
QosLevel: QosAtLeastOnce,
},
MessageId: 0x4321,
Topics: []TopicQos{
{"a/b", QosAtLeastOnce},
{"c/d", QosExactlyOnce},
},
},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x82}},
gbt.Named{"Remaining length", gbt.Literal{2 + 5 + 1 + 5 + 1}},
gbt.Named{"MessageId", gbt.Literal{0x43, 0x21}},
gbt.Named{"First topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
gbt.Named{"First topic QoS", gbt.Literal{1}},
gbt.Named{"Second topic", gbt.Literal{0x00, 0x03, 'c', '/', 'd'}},
gbt.Named{"Second topic QoS", gbt.Literal{2}},
},
},
{
Comment: "SUBACK message",
Msg: &SubAck{
MessageId: 0x1234,
TopicsQos: []QosLevel{QosAtMostOnce, QosExactlyOnce},
},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x90}},
gbt.Named{"Remaining length", gbt.Literal{4}},
gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
gbt.Named{"TopicsQos", gbt.Literal{0x00, 0x02}},
},
},
{
Comment: "UNSUBSCRIBE message",
Msg: &Unsubscribe{
Header: Header{
DupFlag:false,
QosLevel: QosAtLeastOnce,
},
MessageId: 0x4321,
Topics: []string{"a/b", "c/d"},
},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0xa2}},
gbt.Named{"Remaining length", gbt.Literal{2 + 5 + 5}},
gbt.Named{"MessageId", gbt.Literal{0x43, 0x21}},
gbt.Named{"First topic", gbt.Literal{0x00, 0x03, 'a', '/', 'b'}},
gbt.Named{"Second topic", gbt.Literal{0x00, 0x03, 'c', '/', 'd'}},
},
},
{
Comment: "UNSUBACK message",
Msg: &UnsubAck{MessageId: 0x1234},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0xb0}},
gbt.Named{"Remaining length", gbt.Literal{2}},
gbt.Named{"MessageId", gbt.Literal{0x12, 0x34}},
},
},
{
Comment: "PINGREQ message",
Msg: &PingReq{},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0xc0}},
gbt.Named{"Remaining length", gbt.Literal{0}},
},
},
{
Comment: "PINGRESP message",
Msg: &PingResp{},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0xd0}},
gbt.Named{"Remaining length", gbt.Literal{0}},
},
},
{
Comment: "DISCONNECT message",
Msg: &Disconnect{},
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0xe0}},
gbt.Named{"Remaining length", gbt.Literal{0}},
},
},
}
for _, test := range tests {
{
// Test decoding.
expectedBuf := new(bytes.Buffer)
test.Expected.Write(expectedBuf)
if decodedMsg, err := DecodeOneMessage(expectedBuf, test.DecoderConfig); err != nil {
t.Errorf("%s: Unexpected error during decoding: %v", test.Comment, err)
} else if !reflect.DeepEqual(test.Msg, decodedMsg) {
t.Errorf("%s: Decoded value mismatch\n got = %#v\nexpected = %#v",
test.Comment, decodedMsg, test.Msg)
}
}
if !test.NoEncodeTest {
// Test encoding.
encodedBuf := new(bytes.Buffer)
if err := test.Msg.Encode(encodedBuf); err != nil {
t.Errorf("%s: Unexpected error during encoding: %v", test.Comment, err)
} else if err = gbt.Matches(test.Expected, encodedBuf.Bytes()); err != nil {
t.Errorf("%s: Unexpected encoding output: %v", test.Comment, err)
}
}
}
}
func TestErrorEncode(t *testing.T) {
tests := []struct {
Comment string
Msg Message
}{
{
Comment: "Payload reports Size() that's too large for MQTT payload.",
Msg: &Publish{
TopicName: "big/message",
MessageId: 0x1234,
// MaxPayloadSize-5 is too large - the payload space is further
// restricted by the variable header.
Payload: fakeSizePayload(MaxPayloadSize),
},
},
{
Comment: "Payload reports Size() that would overflow when added to variable header size.",
Msg: &Publish{
TopicName: "big/message",
MessageId: 0x1234,
Payload: fakeSizePayload(0x7fffffff),
},
},
}
for _, test := range tests {
encodedBuf := new(bytes.Buffer)
if err := test.Msg.Encode(encodedBuf); err == nil {
t.Errorf("%s: Expected error during encoding, but got nil.", test.Comment)
}
}
}
func TestErrorDecode(t *testing.T) {
tests := []struct {
Commentstring
Expected gbt.Matcher
}{
{
Comment:"Immediate EOF",
Expected: gbt.Literal{},
},
{
Comment:"EOF at 1 byte",
Expected: gbt.Literal{0x10},
},
{
Comment: "PUBACK message with too short a length",
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x40}},
gbt.Named{"Remaining length", gbt.Literal{1}},
gbt.Named{"Truncated MessageId", gbt.Literal{0x12}},
},
},
{
Comment: "PUBACK message with too long a length",
Expected: gbt.InOrder{
gbt.Named{"Header byte", gbt.Literal{0x40}},
gbt.Named{"Remaining length", gbt.Literal{3}},
gbt.Named{"Truncated MessageId", gbt.Literal{0x12, 0x34, 0x56}},
},
},
}
for _, test := range tests {
expectedBuf := new(bytes.Buffer)
test.Expected.Write(expectedBuf)
if _, err := DecodeOneMessage(expectedBuf, nil); err == nil {
t.Errorf("%s: Expected error during decoding, but got nil.", test.Comment)
}
}
}
func TestLengthEncodeDecode(t *testing.T) {
tests := []struct {
Value int32
Encoded gbt.Matcher
}{
{0, gbt.Literal{0}},
{1, gbt.Literal{1}},
{20, gbt.Literal{20}},
// Boundary conditions used as tests taken from MQTT 3.1 spec.
{0, gbt.Literal{0x00}},
{127, gbt.Literal{0x7F}},
{128, gbt.Literal{0x80, 0x01}},
{16383, gbt.Literal{0xFF, 0x7F}},
{16384, gbt.Literal{0x80, 0x80, 0x01}},
{2097151, gbt.Literal{0xFF, 0xFF, 0x7F}},
{2097152, gbt.Literal{0x80, 0x80, 0x80, 0x01}},
{268435455, gbt.Literal{0xFF, 0xFF, 0xFF, 0x7F}},
}
for _, test := range tests {
{
// Test decoding.
buf := new(bytes.Buffer)
test.Encoded.Write(buf)
buf = bytes.NewBuffer(buf.Bytes())
if result := decodeLength(buf); test.Value != result {
t.Errorf("Decoding test %#x: got %#x", test.Value, result)
}
}
{
// Test encoding.
buf := new(bytes.Buffer)
encodeLength(test.Value, buf)
if err := gbt.Matches(test.Encoded, buf.Bytes()); err != nil {
t.Errorf("Encoding test %#x: %v", test.Value, err)
}
}
}
}
type SeqBytePayload struct {
N int
T *testing.T
}
func (p *SeqBytePayload) Size() int {
return p.N
}
func (p *SeqBytePayload) WritePayload(w io.Writer) error {
buf := make([]byte, 256)
for i := range buf {
buf = byte(i)
}
for toWrite := p.N; toWrite > 0; toWrite -= 256 {
writeThisTime := toWrite
if writeThisTime > 256 {
writeThisTime = 256
}
if _, err := w.Write(buf[:writeThisTime]); err != nil {
return err
}
}
return nil
}
func (p *SeqBytePayload) ReadPayload(r io.Reader) error {
buf := make([]byte, 256)
for toRead := p.N; toRead > 0; toRead -= 256 {
readThisTime := toRead
if readThisTime > 256 {
readThisTime = 256
}
if _, err := io.ReadFull(r, buf[:readThisTime]); err != nil {
p.T.Errorf("Got unexpected error %v", err)
return err
}
for i, v := range buf[:readThisTime] {
if v != byte(i) {
p.T.Errorf("Got unexpected byte %#x, expected %#x", v, i)
return io.ErrClosedPipe
}
}
}
// Any more reads should produce an EOF.
if n, err := r.Read(buf); n > 0 {
p.T.Errorf("Got unexpected extra %d bytes of data", n)
} else if err != io.EOF {
p.T.Errorf("Expected err=EOF, got %v", err)
}
return nil
}
// Stream a reasonably large Publish payload containing sequential byte values.
func TestPipedPublish(t *testing.T) {
r, w := io.Pipe()
complete := make(chan bool)
payload := &SeqBytePayload{
N: 20 + (1 << 20),
T: t,
}
go func() {
defer func() {
complete <- true
w.Close()
}()
msg := &Publish{
TopicName: "foo",
Payload: payload,
}
if err := msg.Encode(w); err != nil {
t.Error(err)
}
}()
go func() {
defer func() {
complete <- true
r.Close()
}()
expectedMsg := &Publish{
TopicName: "foo",
Payload: payload,
}
testConfig := &ValueConfig{payload}
if msg, err := DecodeOneMessage(r, testConfig); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedMsg, msg) {
t.Errorf(" got = %#v\nexpected = %#v", msg, expectedMsg)
}
}()
_ = <-complete
_ = <-complete
}
本帖最后由 keer_zu 于 2017-3-7 15:16 编辑
keer_zu 发表于 2017-3-7 14:43
先看它的TEST : mqtt_test.go
第28行:func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)
这里涉及到一个类型 Publish,下面就先来分析一下这个类型:
// Publish represents an MQTT PUBLISH message.
type Publish struct {
Header
TopicName string
MessageId uint16
Payload Payload
}
上面是头。
3.3.2 可变报头可变报头按顺序包含主题名和报文标识符。
主题名 Topic Name主题名(Topic Name)用于识别有效载荷数据应该被发布到哪一个信息通道。
主题名必须是PUBLISH报文可变报头的第一个字段。它必须是 1.5.3节定义的UTF-8编码的字符串 。
PUBLISH报文中的主题名不能包含通配符 。
服务端发送给订阅客户端的PUBLISH报文的主题名必须匹配该订阅的主题过滤器(根据 4.7节定义的匹配过程)。
报文标识符 Packet Identifier只有当QoS等级是1或2时,报文标识符(Packet Identifier)字段才能出现在PUBLISH报文中。2.3.1节提供了有关报文标识符的更多信息。
可变报头非规范示例图例 3.11 – PUBLISH报文可变报头非规范示例 举例说明了 表格 3.3 - PUBLISH报文非规范示例 中简要描述的PUBLISH报文的可变报头。
表格 3.3 - PUBLISH报文非规范示例
FieldValue
主题名a/b
报文标识符10
keer_zu 发表于 2017-3-7 15:14
第28行:func (c fakeDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)
...
有效载荷
有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度。包含零长度有效载荷的PUBLISH报文是合法的。
以上就是publish的三部分组成。
func (msg *Publish) Encode(w io.Writer) (err error) {
buf := new(bytes.Buffer)
setString(msg.TopicName, buf)
if msg.Header.QosLevel.HasId() {
setUint16(msg.MessageId, buf)
}
if err = writeMessage(w, MsgPublish, &msg.Header, buf, int32(msg.Payload.Size())); err != nil {
return
}
return msg.Payload.WritePayload(w)
}
func (msg *Publish) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error) {
defer func() {
err = recoverError(err, recover())
}()
msg.Header = hdr
msg.TopicName = getString(r, &packetRemaining)
if msg.Header.QosLevel.HasId() {
msg.MessageId = getUint16(r, &packetRemaining)
}
payloadReader := &io.LimitedReader{r, int64(packetRemaining)}
if msg.Payload, err = config.MakePayload(msg, payloadReader, int(packetRemaining)); err != nil {
return
}
return msg.Payload.ReadPayload(payloadReader)
}
该类型支持这两者方法,分别是编码和解码。
yyy71cj 发表于 2017-3-7 15:18
本想给你加个精的,一看是他人板块……
{:lol:} yyy71cj 发表于 2017-3-7 17:03
可以了解一下mqtt 可以看到整个message.go就是对:
3.0 Contents – MQTT控制报文
3.1 CONNECT – 连接服务端
3.2 CONNACK – 确认连接请求
3.3 PUBLISH – 发布消息
3.4 PUBACK –发布确认
3.5 PUBREC – 发布收到(QoS 2,第一步)
3.6 PUBREL – 发布释放(QoS 2,第二步)
3.7 PUBCOMP – 发布完成(QoS 2,第三步)
3.8 SUBSCRIBE - 订阅主题
3.9 SUBACK – 订阅确认
3.10 UNSUBSCRIBE –取消订阅
3.11 UNSUBACK – 取消订阅确认
3.12 PINGREQ – 心跳请求
3.13 PINGRESP – 心跳响应
3.14 DISCONNECT –断开连接
这些消息的编解码。 encodeing.go 只是最基本的运算和转换
package mqtt
import (
"bytes"
"io"
)
func getUint8(r io.Reader, packetRemaining *int32) uint8 {
if *packetRemaining < 1 {
raiseError(dataExceedsPacketError)
}
var b byte
if _, err := io.ReadFull(r, b[:]); err != nil {
raiseError(err)
}
*packetRemaining--
return b
}
func getUint16(r io.Reader, packetRemaining *int32) uint16 {
if *packetRemaining < 2 {
raiseError(dataExceedsPacketError)
}
var b byte
if _, err := io.ReadFull(r, b[:]); err != nil {
raiseError(err)
}
*packetRemaining -= 2
return uint16(b)<<8 | uint16(b)
}
func getString(r io.Reader, packetRemaining *int32) string {
strLen := int(getUint16(r, packetRemaining))
if int(*packetRemaining) < strLen {
raiseError(dataExceedsPacketError)
}
b := make([]byte, strLen)
if _, err := io.ReadFull(r, b); err != nil {
raiseError(err)
}
*packetRemaining -= int32(strLen)
return string(b)
}
func setUint8(val uint8, buf *bytes.Buffer) {
buf.WriteByte(byte(val))
}
func setUint16(val uint16, buf *bytes.Buffer) {
buf.WriteByte(byte(val & 0xff00 >> 8))
buf.WriteByte(byte(val & 0x00ff))
}
func setString(val string, buf *bytes.Buffer) {
length := uint16(len(val))
setUint16(length, buf)
buf.WriteString(val)
}
func boolToByte(val bool) byte {
if val {
return byte(1)
}
return byte(0)
}
func decodeLength(r io.Reader) int32 {
var v int32
var buf byte
var shift uint
for i := 0; i < 4; i++ {
if _, err := io.ReadFull(r, buf[:]); err != nil {
raiseError(err)
}
b := buf
v |= int32(b&0x7f) << shift
if b&0x80 == 0 {
return v
}
shift += 7
}
raiseError(badLengthEncodingError)
panic("unreachable")
}
func encodeLength(length int32, buf *bytes.Buffer) {
if length == 0 {
buf.WriteByte(0)
return
}
for length > 0 {
digit := length & 0x7f
length = length >> 7
if length > 0 {
digit = digit | 0x80
}
buf.WriteByte(byte(digit))
}
}
payload.go 很短,定义了Payload接口,同时实现了BytesPayload和StreamedPayload两种类型。
他们都可以以这个接口调用,具体对外的方法有:
Size()----- 返回Payload的长度。
WritePayload() ------ 写Payload
ReadPayload() ------ 读Payload
代码如下:
package mqtt
import (
"io"
)
// Payload is the interface for Publish payloads. Typically the BytesPayload
// implementation will be sufficient for small payloads whose full contents
// will exist in memory. However, other implementations can read or write
// payloads requiring them holding their complete contents in memory.
type Payload interface {
// Size returns the number of bytes that WritePayload will write.
Size() int
// WritePayload writes the payload data to w. Implementations must write
// Size() bytes of data, but it is *not* required to do so prior to
// returning. Size() bytes must have been written to w prior to another
// message being encoded to the underlying connection.
WritePayload(w io.Writer) error
// ReadPayload reads the payload data from r (r will EOF at the end of the
// payload). It is *not* required for r to have been consumed prior to this
// returning. r must have been consumed completely prior to another message
// being decoded from the underlying connection.
ReadPayload(r io.Reader) error
}
// BytesPayload reads/writes a plain slice of bytes.
type BytesPayload []byte
func (p BytesPayload) Size() int {
return len(p)
}
func (p BytesPayload) WritePayload(w io.Writer) error {
_, err := w.Write(p)
return err
}
func (p BytesPayload) ReadPayload(r io.Reader) error {
_, err := io.ReadFull(r, p)
return err
}
// StreamedPayload writes payload data from reader, or reads payload data into a writer.
type StreamedPayload struct {
// N indicates payload size to the encoder. This many bytes will be read from
// the reader when encoding. The number of bytes in the payload will be
// stored here when decoding.
N int
// EncodingSource is used to copy data from when encoding a Publish message
// onto the wire. This can be
EncodingSource io.Reader
// DecodingSink is used to copy data to when decoding a Publish message from
// the wire. This can be nil if the payload is only being used for encoding.
DecodingSink io.Writer
}
func (p *StreamedPayload) Size() int {
return p.N
}
func (p *StreamedPayload) WritePayload(w io.Writer) error {
_, err := io.CopyN(w, p.EncodingSource, int64(p.N))
return err
}
func (p *StreamedPayload) ReadPayload(r io.Reader) error {
n, err := io.Copy(p.DecodingSink, r)
p.N = int(n)
return err
}
再来看看mqtt.go
他是在messages.go和payload.go等基础上 的再封装,可以从前面注释和后面代码看出:
他主要提供了两个API:
1. func DecodeOneMessage(r io.Reader, config DecoderConfig) (msg Message, err error)
2. func NewMessage(msgType MessageType) (msg Message, err error)
分别用来解析一条mqtt消息和构建一条mqtt消息。至此,这个project就很明显了,它只提供mqtt消息的处理。
然后让我们再回到之前调用他的那个project: 一起看看这个go语言实现的mqtt
// Implementation of MQTT V3.1 encoding and decoding.
//
// See http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
// for the MQTT protocol specification. This package does not implement the
// semantics of MQTT, but purely the encoding and decoding of its messages.
//
// Decoding Messages:
//
// Use the DecodeOneMessage function to read a Message from an io.Reader, it
// will return a Message value. The function can be implemented using the public
// API of this package if more control is required. For example:
//
// for {
// msg, err := mqtt.DecodeOneMessage(conn, nil)
// if err != nil {
// // handle err
// }
// switch msg := msg.(type) {
// case *Connect:
// // ...
// case *Publish:
// // ...
// // etc.
// }
// }
//
// Encoding Messages:
//
// Create a message value, and use its Encode method to write it to an
// io.Writer. For example:
//
// someData := []byte{1, 2, 3}
// msg := &Publish{
// Header: {
// DupFlag: false,
// QosLevel: QosAtLeastOnce,
// Retain: false,
// },
// TopicName: "a/b",
// MessageId: 10,
// Payload: BytesPayload(someData),
// }
// if err := msg.Encode(conn); err != nil {
// // handle err
// }
//
// Advanced PUBLISH payload handling:
//
// The default behaviour for decoding PUBLISH payloads, and most common way to
// supply payloads for encoding, is the BytesPayload, which is a []byte
// derivative.
//
// More complex handling is possible by implementing the Payload interface,
// which can be injected into DecodeOneMessage via the `config` parameter, or
// into an outgoing Publish message via its Payload field.Potential benefits
// of this include:
//
// * Data can be (un)marshalled directly on a connection, without an unecessary
// round-trip via bytes.Buffer.
//
// * Data can be streamed directly on readers/writers (e.g files, other
// connections, pipes) without the requirement to buffer an entire message
// payload in memory at once.
//
// The limitations of these streaming features are:
//
// * When encoding a payload, the encoded size of the payload must be known and
// declared upfront.
//
// * The payload size (and PUBLISH variable header) can be no more than 256MiB
// minus 1 byte. This is a specified limitation of MQTT v3.1 itself.
package mqtt
import (
"errors"
"io"
)
var (
badMsgTypeError = errors.New("mqtt: message type is invalid")
badQosError = errors.New("mqtt: QoS is invalid")
badWillQosError = errors.New("mqtt: will QoS is invalid")
badLengthEncodingError = errors.New("mqtt: remaining length field exceeded maximum of 4 bytes")
badReturnCodeError = errors.New("mqtt: is invalid")
dataExceedsPacketError = errors.New("mqtt: data exceeds packet length")
msgTooLongError = errors.New("mqtt: message is too long")
)
const (
QosAtMostOnce = QosLevel(iota)
QosAtLeastOnce
QosExactlyOnce
qosFirstInvalid
)
type QosLevel uint8
func (qos QosLevel) IsValid() bool {
return qos < qosFirstInvalid
}
func (qos QosLevel) HasId() bool {
return qos == QosAtLeastOnce || qos == QosExactlyOnce
}
const (
RetCodeAccepted = ReturnCode(iota)
RetCodeUnacceptableProtocolVersion
RetCodeIdentifierRejected
RetCodeServerUnavailable
RetCodeBadUsernameOrPassword
RetCodeNotAuthorized
retCodeFirstInvalid
)
type ReturnCode uint8
func (rc ReturnCode) IsValid() bool {
return rc >= RetCodeAccepted && rc < retCodeFirstInvalid
}
// DecoderConfig provides configuration for decoding messages.
type DecoderConfig interface {
// MakePayload returns a Payload for the given Publish message. r is a Reader
// that will read the payload data, and n is the number of bytes in the
// payload. The Payload.ReadPayload method is called on the returned payload
// by the decoding process.
MakePayload(msg *Publish, r io.Reader, n int) (Payload, error)
}
type DefaultDecoderConfig struct{}
func (c DefaultDecoderConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
return make(BytesPayload, n), nil
}
// ValueConfig always returns the given Payload when MakePayload is called.
type ValueConfig struct {
Payload Payload
}
func (c *ValueConfig) MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) {
return c.Payload, nil
}
// DecodeOneMessage decodes one message from r. config provides specifics on
// how to decode messages, nil indicates that the DefaultDecoderConfig should
// be used.
func DecodeOneMessage(r io.Reader, config DecoderConfig) (msg Message, err error) {
var hdr Header
var msgType MessageType
var packetRemaining int32
msgType, packetRemaining, err = hdr.Decode(r)
if err != nil {
return
}
msg, err = NewMessage(msgType)
if err != nil {
return
}
if config == nil {
config = DefaultDecoderConfig{}
}
return msg, msg.Decode(r, hdr, packetRemaining, config)
}
// NewMessage creates an instance of a Message value for the given message
// type. An error is returned if msgType is invalid.
func NewMessage(msgType MessageType) (msg Message, err error) {
switch msgType {
case MsgConnect:
msg = new(Connect)
case MsgConnAck:
msg = new(ConnAck)
case MsgPublish:
msg = new(Publish)
case MsgPubAck:
msg = new(PubAck)
case MsgPubRec:
msg = new(PubRec)
case MsgPubRel:
msg = new(PubRel)
case MsgPubComp:
msg = new(PubComp)
case MsgSubscribe:
msg = new(Subscribe)
case MsgUnsubAck:
msg = new(UnsubAck)
case MsgSubAck:
msg = new(SubAck)
case MsgUnsubscribe:
msg = new(Unsubscribe)
case MsgPingReq:
msg = new(PingReq)
case MsgPingResp:
msg = new(PingResp)
case MsgDisconnect:
msg = new(Disconnect)
default:
return nil, badMsgTypeError
}
return
}
// panicErr wraps an error that caused a problem that needs to bail out of the
// API, such that errors can be recovered and returned as errors from the
// public API.
type panicErr struct {
err error
}
func (p panicErr) Error() string {
return p.err.Error()
}
func raiseError(err error) {
panic(panicErr{err})
}
// recoverError recovers any panic in flight and, iff it's an error from
// raiseError, will return the error. Otherwise re-raises the panic value.
// If no panic is in flight, it returns existingErr.
//
// This must be used in combination with a defer in all public API entry
// points where raiseError could be called.
func recoverError(existingErr error, recovered interface{}) error {
if recovered != nil {
if pErr, ok := recovered.(panicErr); ok {
return pErr.err
} else {
panic(recovered)
}
}
return existingErr
} 回头再看看mqtt_test.go ,就是对所有mqtt消息的decode和encode进行测试,调用mqtt.go中的API
至此这个github托管小项目就很清楚了。 yyy71cj 发表于 2017-3-8 08:06
全是代码,不知道爱代码的人感觉如何
肯定喜欢得不要不要的{:lol:} yyy71cj 发表于 2017-3-8 13:10
这里,代码控还是很多的 yyy71cj 发表于 2017-3-8 15:13
说不定有人会喜欢得……要啊……要啊……的
{:sweat:} 余兄,此情此景求分享一篇原创小黄文。 yyy71cj 发表于 2017-3-8 15:56
想写的,就是不能当饭吃,我要先混口饭吃呢……
这个比写书挣钱{:titter:} 淡定,淡定,淡定……
yyy71cj 发表于 2017-3-8 16:01
是么?我得考虑考虑……
mosquitto yyy71cj 发表于 2017-6-13 21:27
这个做mqtt服务器确实不错。 yyy71cj 发表于 2017-6-14 10:14
还没涉足这个领域,所以你突然说个不错,我不知道从哪里看过去…… ...
用过了才知道。
页:
[1]
2