一起看看这个go语言实现的mqtt

[复制链接]
12086|51
 楼主 | 2017-3-3 09:15 | 显示全部楼层 |阅读模式
本帖最后由 keer_zu 于 2017-3-7 15:07 编辑

mqtt.git

github链接


协议分析

MQTT协议中文版by mcxiaoke
最新版本: v1.0.3 2016.02.06
文档地址概述MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。
说明
MQTT英文原版协议提供了Word格式和HTML格式,我翻译的时候用的Word文档,之前一直提供的是Word文档转换的HTML和PDF共浏览和下载,最近花时间整理了Markdown版本,可以更方便的分章节在线浏览了,转换为Markdown后部分表格的格式不太对,会逐步用图片代替。
目录发现任何翻译问题或格式问题欢迎提PR帮忙完善。



上面所列举go实现的有关说明:

mqttMQTT Clients, Servers and Load Testers in Go
For docs, see: http://godoc.org/github.com/jeffallen/mqtt
For a little discussion of this code see: http://blog.nella.org/mqtt-code-golf
LimitationsAt this time, the following limitations apply:
  • QoS level 0 only; messages are only stored in RAM
  • Retain works, but at QoS level 0. Retained messages are lost on server restart.
  • Last will messages are not implemented.
  • Keepalive and timeouts are not implemented.
ServersThe example MQTT servers are in directories mqttsrv and smqttsrv (secured with TLS).
Benchmarking ToolsTo use the benchmarking tools, cd into pingtest, loadtest, or many and type "go build". The tools have reasonable defaults, but you'll also want to use the -help flag to find out what can be tuned.
All benchmarks suck, and these three suck in different ways.
pingtest simulates a number of pairs of clients who are bouncing messages between them as fast as possible. It aims to measure latency of messages through the system when under load.
loadtest simulates a number of pairs of clients where one is sending as fast as possible to the other. Realistically, this ends up testing the ability of the system to decode and queue messages, because any slight inbalance in scheduling of readers causes a pile up of messages from the writer slamming them down the throat of the server.
many simulates a large number of clients who send a low transaction rate. The goal is to eventually use this to achieve 1 million (and more?) concurrent, active MQTT sessions in one server. So far, mqttsrv has survived a load of 40k concurrent connections from many.



@21ic小喇叭 这个也是

@21小跑堂
@ddllxxrr @dong_abc @guojihongwhpu1 @ningling_21 @yyy71cj @renxiaolin ........
 楼主 | 2017-3-7 10:55 | 显示全部楼层
主要实现都在这个文件: mqtt/mqtt.go

使用特权

评论回复
 楼主 | 2017-3-7 10:57 | 显示全部楼层
文件太长,下面是钱368行:
  1. // Package mqtt implements MQTT clients and servers.
  2. package mqtt

  3. import (
  4.         crand "crypto/rand"
  5.         "errors"
  6.         "fmt"
  7.         "io"
  8.         "log"
  9.         "math/rand"
  10.         "net"
  11.         "runtime"
  12.         "strings"
  13.         "sync"
  14.         "sync/atomic"
  15.         "time"

  16.         proto "github.com/huin/mqtt"
  17. )

  18. // A random number generator ready to make client-id's, if
  19. // they do not provide them to us.
  20. var cliRand *rand.Rand

  21. func init() {
  22.         var seed int64
  23.         var sb [4]byte
  24.         crand.Read(sb[:])
  25.         seed = int64(time.Now().Nanosecond())<<32 |
  26.                 int64(sb[0])<<24 | int64(sb[1])<<16 |
  27.                 int64(sb[2])<<8 | int64(sb[3])
  28.         cliRand = rand.New(rand.NewSource(seed))
  29. }

  30. type stats struct {
  31.         recv       int64
  32.         sent       int64
  33.         clients    int64
  34.         clientsMax int64
  35.         lastmsgs   int64
  36. }

  37. func (s *stats) messageRecv()      { atomic.AddInt64(&s.recv, 1) }
  38. func (s *stats) messageSend()      { atomic.AddInt64(&s.sent, 1) }
  39. func (s *stats) clientConnect()    { atomic.AddInt64(&s.clients, 1) }
  40. func (s *stats) clientDisconnect() { atomic.AddInt64(&s.clients, -1) }

  41. func statsMessage(topic string, stat int64) *proto.Publish {
  42.         return &proto.Publish{
  43.                 Header:    header(dupFalse, proto.QosAtMostOnce, retainTrue),
  44.                 TopicName: topic,
  45.                 Payload:   newIntPayload(stat),
  46.         }
  47. }

  48. func (s *stats) publish(sub *subscriptions, interval time.Duration) {
  49.         clients := atomic.LoadInt64(&s.clients)
  50.         clientsMax := atomic.LoadInt64(&s.clientsMax)
  51.         if clients > clientsMax {
  52.                 clientsMax = clients
  53.                 atomic.StoreInt64(&s.clientsMax, clientsMax)
  54.         }
  55.         sub.submit(nil, statsMessage("$SYS/broker/clients/active", clients))
  56.         sub.submit(nil, statsMessage("$SYS/broker/clients/maximum", clientsMax))
  57.         sub.submit(nil, statsMessage("$SYS/broker/messages/received",
  58.                 atomic.LoadInt64(&s.recv)))
  59.         sub.submit(nil, statsMessage("$SYS/broker/messages/sent",
  60.                 atomic.LoadInt64(&s.sent)))

  61.         msgs := atomic.LoadInt64(&s.recv) + atomic.LoadInt64(&s.sent)
  62.         msgpersec := (msgs - s.lastmsgs) / int64(interval/time.Second)
  63.         // no need for atomic because we are the only reader/writer of it
  64.         s.lastmsgs = msgs

  65.         sub.submit(nil, statsMessage("$SYS/broker/messages/per-sec", msgpersec))
  66. }

  67. // An intPayload implements proto.Payload, and is an int64 that
  68. // formats itself and then prints itself into the payload.
  69. type intPayload string

  70. func newIntPayload(i int64) intPayload {
  71.         return intPayload(fmt.Sprint(i))
  72. }
  73. func (ip intPayload) ReadPayload(r io.Reader) error {
  74.         // not implemented
  75.         return nil
  76. }
  77. func (ip intPayload) WritePayload(w io.Writer) error {
  78.         _, err := w.Write([]byte(string(ip)))
  79.         return err
  80. }
  81. func (ip intPayload) Size() int {
  82.         return len(ip)
  83. }

  84. // A retain holds information necessary to correctly manage retained
  85. // messages.
  86. //
  87. // This needs to hold copies of the proto.Publish, not pointers to
  88. // it, or else we can send out one with the wrong retain flag.
  89. type retain struct {
  90.         m    proto.Publish
  91.         wild wild
  92. }

  93. type subscriptions struct {
  94.         workers int
  95.         posts   chan post

  96.         mu        sync.Mutex // guards access to fields below
  97.         subs      map[string][]*incomingConn
  98.         wildcards []wild
  99.         retain    map[string]retain
  100.         stats     *stats
  101. }

  102. // The length of the queue that subscription processing
  103. // workers are taking from.
  104. const postQueue = 100

  105. func newSubscriptions(workers int) *subscriptions {
  106.         s := &subscriptions{
  107.                 subs:    make(map[string][]*incomingConn),
  108.                 retain:  make(map[string]retain),
  109.                 posts:   make(chan post, postQueue),
  110.                 workers: workers,
  111.         }
  112.         for i := 0; i < s.workers; i++ {
  113.                 go s.run(i)
  114.         }
  115.         return s
  116. }

  117. func (s *subscriptions) sendRetain(topic string, c *incomingConn) {
  118.         s.mu.Lock()
  119.         var tlist []string
  120.         if isWildcard(topic) {

  121.                 // TODO: select matching topics from the retain map
  122.         } else {
  123.                 tlist = []string{topic}
  124.         }
  125.         for _, t := range tlist {
  126.                 if r, ok := s.retain[t]; ok {
  127.                         c.submit(&r.m)
  128.                 }
  129.         }
  130.         s.mu.Unlock()
  131. }

  132. func (s *subscriptions) add(topic string, c *incomingConn) {
  133.         s.mu.Lock()
  134.         defer s.mu.Unlock()
  135.         if isWildcard(topic) {
  136.                 w := newWild(topic, c)
  137.                 if w.valid() {
  138.                         s.wildcards = append(s.wildcards, w)
  139.                 }
  140.         } else {
  141.                 s.subs[topic] = append(s.subs[topic], c)
  142.         }
  143. }

  144. type wild struct {
  145.         wild []string
  146.         c    *incomingConn
  147. }

  148. func newWild(topic string, c *incomingConn) wild {
  149.         return wild{wild: strings.Split(topic, "/"), c: c}
  150. }

  151. func (w wild) matches(parts []string) bool {
  152.         i := 0
  153.         for i < len(parts) {
  154.                 // topic is longer, no match
  155.                 if i >= len(w.wild) {
  156.                         return false
  157.                 }
  158.                 // matched up to here, and now the wildcard says "all others will match"
  159.                 if w.wild[i] == "#" {
  160.                         return true
  161.                 }
  162.                 // text does not match, and there wasn't a + to excuse it
  163.                 if parts[i] != w.wild[i] && w.wild[i] != "+" {
  164.                         return false
  165.                 }
  166.                 i++
  167.         }

  168.         // make finance/stock/ibm/# match finance/stock/ibm
  169.         if i == len(w.wild)-1 && w.wild[len(w.wild)-1] == "#" {
  170.                 return true
  171.         }

  172.         if i == len(w.wild) {
  173.                 return true
  174.         }
  175.         return false
  176. }

  177. // Find all connections that are subscribed to this topic.
  178. func (s *subscriptions) subscribers(topic string) []*incomingConn {
  179.         s.mu.Lock()
  180.         defer s.mu.Unlock()

  181.         // non-wildcard subscribers
  182.         res := s.subs[topic]

  183.         // process wildcards
  184.         parts := strings.Split(topic, "/")
  185.         for _, w := range s.wildcards {
  186.                 if w.matches(parts) {
  187.                         res = append(res, w.c)
  188.                 }
  189.         }

  190.         return res
  191. }

  192. // Remove all subscriptions that refer to a connection.
  193. func (s *subscriptions) unsubAll(c *incomingConn) {
  194.         s.mu.Lock()
  195.         for _, v := range s.subs {
  196.                 for i := range v {
  197.                         if v[i] == c {
  198.                                 v[i] = nil
  199.                         }
  200.                 }
  201.         }

  202.         // remove any associated entries in the wildcard list
  203.         var wildNew []wild
  204.         for i := 0; i < len(s.wildcards); i++ {
  205.                 if s.wildcards[i].c != c {
  206.                         wildNew = append(wildNew, s.wildcards[i])
  207.                 }
  208.         }
  209.         s.wildcards = wildNew

  210.         s.mu.Unlock()
  211. }

  212. // Remove the subscription to topic for a given connection.
  213. func (s *subscriptions) unsub(topic string, c *incomingConn) {
  214.         s.mu.Lock()
  215.         if subs, ok := s.subs[topic]; ok {
  216.                 nils := 0

  217.                 // Search the list, removing references to our connection.
  218.                 // At the same time, count the nils to see if this list is now empty.
  219.                 for i := 0; i < len(subs); i++ {
  220.                         if subs[i] == c {
  221.                                 subs[i] = nil
  222.                         }
  223.                         if subs[i] == nil {
  224.                                 nils++
  225.                         }
  226.                 }

  227.                 if nils == len(subs) {
  228.                         delete(s.subs, topic)
  229.                 }
  230.         }
  231.         s.mu.Unlock()
  232. }

  233. // The subscription processing worker.
  234. func (s *subscriptions) run(id int) {
  235.         tag := fmt.Sprintf("worker %d ", id)
  236.         log.Print(tag, "started")
  237.         for post := range s.posts {
  238.                 // Remember the original retain setting, but send out immediate
  239.                 // copies without retain: "When a server sends a PUBLISH to a client
  240.                 // as a result of a subscription that already existed when the
  241.                 // original PUBLISH arrived, the Retain flag should not be set,
  242.                 // regardless of the Retain flag of the original PUBLISH.
  243.                 isRetain := post.m.Header.Retain
  244.                 post.m.Header.Retain = false

  245.                 // Handle "retain with payload size zero = delete retain".
  246.                 // Once the delete is done, return instead of continuing.
  247.                 if isRetain && post.m.Payload.Size() == 0 {
  248.                         s.mu.Lock()
  249.                         delete(s.retain, post.m.TopicName)
  250.                         s.mu.Unlock()
  251.                         return
  252.                 }

  253.                 // Find all the connections that should be notified of this message.
  254.                 conns := s.subscribers(post.m.TopicName)

  255.                 // Queue the outgoing messages
  256.                 for _, c := range conns {
  257.                         // Do not echo messages back to where they came from.
  258.                         if c == post.c {
  259.                                 continue
  260.                         }

  261.                         if c != nil {
  262.                                 c.submit(post.m)
  263.                         }
  264.                 }

  265.                 if isRetain {
  266.                         s.mu.Lock()
  267.                         // Save a copy of it, and set that copy's Retain to true, so that
  268.                         // when we send it out later we notify new subscribers that this
  269.                         // is an old message.
  270.                         msg := *post.m
  271.                         msg.Header.Retain = true
  272.                         s.retain[post.m.TopicName] = retain{m: msg}
  273.                         s.mu.Unlock()
  274.                 }
  275.         }
  276. }

  277. func (s *subscriptions) submit(c *incomingConn, m *proto.Publish) {
  278.         s.posts <- post{c: c, m: m}
  279. }

  280. // A post is a unit of work for the subscription processing workers.
  281. type post struct {
  282.         c *incomingConn
  283.         m *proto.Publish
  284. }

  285. // A Server holds all the state associated with an MQTT server.
  286. type Server struct {
  287.         l             net.Listener
  288.         subs          *subscriptions
  289.         stats         *stats
  290.         Done          chan struct{}
  291.         StatsInterval time.Duration // Defaults to 10 seconds. Must be set using sync/atomic.StoreInt64().
  292.         Dump          bool          // When true, dump the messages in and out.
  293.         rand          *rand.Rand
  294. }

  295. // NewServer creates a new MQTT server, which accepts connections from
  296. // the given listener. When the server is stopped (for instance by
  297. // another goroutine closing the net.Listener), channel Done will become
  298. // readable.
  299. func NewServer(l net.Listener) *Server {
  300.         svr := &Server{
  301.                 l:             l,
  302.                 stats:         &stats{},
  303.                 Done:          make(chan struct{}),
  304.                 StatsInterval: time.Second * 10,
  305.                 subs:          newSubscriptions(runtime.GOMAXPROCS(0)),
  306.         }

  307.         // start the stats reporting goroutine
  308.         go func() {
  309.                 for {
  310.                         svr.stats.publish(svr.subs, svr.StatsInterval)
  311.                         select {
  312.                         case <-svr.Done:
  313.                                 return
  314.                         default:
  315.                                 // keep going
  316.                         }
  317.                         time.Sleep(svr.StatsInterval)
  318.                 }
  319.         }()

  320.         return svr
  321. }
复制代码

使用特权

评论回复
| 2017-3-7 10:57 | 显示全部楼层
慢慢玩吧,有你爽的时候

使用特权

评论回复
 楼主 | 2017-3-7 10:57 | 显示全部楼层
接下来是后半部分:
  1. // Start makes the Server start accepting and handling connections.
  2. func (s *Server) Start() {
  3.         go func() {
  4.                 for {
  5.                         conn, err := s.l.Accept()
  6.                         if err != nil {
  7.                                 log.Print("Accept: ", err)
  8.                                 break
  9.                         }

  10.                         cli := s.newIncomingConn(conn)
  11.                         s.stats.clientConnect()
  12.                         cli.start()
  13.                 }
  14.                 close(s.Done)
  15.         }()
  16. }

  17. // An IncomingConn represents a connection into a Server.
  18. type incomingConn struct {
  19.         svr      *Server
  20.         conn     net.Conn
  21.         jobs     chan job
  22.         clientid string
  23.         Done     chan struct{}
  24. }

  25. var clients = make(map[string]*incomingConn)
  26. var clientsMu sync.Mutex

  27. const sendingQueueLength = 10000

  28. // newIncomingConn creates a new incomingConn associated with this
  29. // server. The connection becomes the property of the incomingConn
  30. // and should not be touched again by the caller until the Done
  31. // channel becomes readable.
  32. func (s *Server) newIncomingConn(conn net.Conn) *incomingConn {
  33.         return &incomingConn{
  34.                 svr:  s,
  35.                 conn: conn,
  36.                 jobs: make(chan job, sendingQueueLength),
  37.                 Done: make(chan struct{}),
  38.         }
  39. }

  40. type receipt chan struct{}

  41. // Wait for the receipt to indicate that the job is done.
  42. func (r receipt) wait() {
  43.         // TODO: timeout
  44.         <-r
  45. }

  46. type job struct {
  47.         m proto.Message
  48.         r receipt
  49. }

  50. // Start reading and writing on this connection.
  51. func (c *incomingConn) start() {
  52.         go c.reader()
  53.         go c.writer()
  54. }

  55. // Add this        connection to the map, or find out that an existing connection
  56. // already exists for the same client-id.
  57. func (c *incomingConn) add() *incomingConn {
  58.         clientsMu.Lock()
  59.         defer clientsMu.Unlock()

  60.         existing, ok := clients[c.clientid]
  61.         if !ok {
  62.                 // this client id already exists, return it
  63.                 return existing
  64.         }

  65.         clients[c.clientid] = c
  66.         return nil
  67. }

  68. // Delete a connection; the conection must be closed by the caller first.
  69. func (c *incomingConn) del() {
  70.         clientsMu.Lock()
  71.         defer clientsMu.Unlock()
  72.         delete(clients, c.clientid)
  73.         return
  74. }

  75. // Replace any existing connection with this one. The one to be replaced,
  76. // if any, must be closed first by the caller.
  77. func (c *incomingConn) replace() {
  78.         clientsMu.Lock()
  79.         defer clientsMu.Unlock()

  80.         // Check that any existing connection is already closed.
  81.         existing, ok := clients[c.clientid]
  82.         if ok {
  83.                 die := false
  84.                 select {
  85.                 case _, ok := <-existing.jobs:
  86.                         // what? we are expecting that this channel is closed!
  87.                         if ok {
  88.                                 die = true
  89.                         }
  90.                 default:
  91.                         die = true
  92.                 }
  93.                 if die {
  94.                         panic("attempting to replace a connection that is not closed")
  95.                 }

  96.                 delete(clients, c.clientid)
  97.         }

  98.         clients[c.clientid] = c
  99.         return
  100. }

  101. // Queue a message; no notification of sending is done.
  102. func (c *incomingConn) submit(m proto.Message) {
  103.         j := job{m: m}
  104.         select {
  105.         case c.jobs <- j:
  106.         default:
  107.                 log.Print(c, ": failed to submit message")
  108.         }
  109.         return
  110. }

  111. func (c *incomingConn) String() string {
  112.         return fmt.Sprintf("{IncomingConn: %v}", c.clientid)
  113. }

  114. // Queue a message, returns a channel that will be readable
  115. // when the message is sent.
  116. func (c *incomingConn) submitSync(m proto.Message) receipt {
  117.         j := job{m: m, r: make(receipt)}
  118.         c.jobs <- j
  119.         return j.r
  120. }

  121. func (c *incomingConn) reader() {
  122.         // On exit, close the connection and arrange for the writer to exit
  123.         // by closing the output channel.
  124.         defer func() {
  125.                 c.conn.Close()
  126.                 c.svr.stats.clientDisconnect()
  127.                 close(c.jobs)
  128.         }()

  129.         for {
  130.                 // TODO: timeout (first message and/or keepalives)
  131.                 m, err := proto.DecodeOneMessage(c.conn, nil)
  132.                 if err != nil {
  133.                         if err == io.EOF {
  134.                                 return
  135.                         }
  136.                         if strings.HasSuffix(err.Error(), "use of closed network connection") {
  137.                                 return
  138.                         }
  139.                         log.Print("reader: ", err)
  140.                         return
  141.                 }
  142.                 c.svr.stats.messageRecv()

  143.                 if c.svr.Dump {
  144.                         log.Printf("dump  in: %T", m)
  145.                 }

  146.                 switch m := m.(type) {
  147.                 case *proto.Connect:
  148.                         rc := proto.RetCodeAccepted

  149.                         if m.ProtocolName != "MQIsdp" ||
  150.                                 m.ProtocolVersion != 3 {
  151.                                 log.Print("reader: reject connection from ", m.ProtocolName, " version ", m.ProtocolVersion)
  152.                                 rc = proto.RetCodeUnacceptableProtocolVersion
  153.                         }

  154.                         // Check client id.
  155.                         if len(m.ClientId) < 1 || len(m.ClientId) > 23 {
  156.                                 rc = proto.RetCodeIdentifierRejected
  157.                         }
  158.                         c.clientid = m.ClientId

  159.                         // Disconnect existing connections.
  160.                         if existing := c.add(); existing != nil {
  161.                                 disconnect := &proto.Disconnect{}
  162.                                 r := existing.submitSync(disconnect)
  163.                                 r.wait()
  164.                                 existing.del()
  165.                         }
  166.                         c.add()

  167.                         // TODO: Last will

  168.                         connack := &proto.ConnAck{
  169.                                 ReturnCode: rc,
  170.                         }
  171.                         c.submit(connack)

  172.                         // close connection if it was a bad connect
  173.                         if rc != proto.RetCodeAccepted {
  174.                                 log.Printf("Connection refused for %v: %v", c.conn.RemoteAddr(), ConnectionErrors[rc])
  175.                                 return
  176.                         }

  177.                         // Log in mosquitto format.
  178.                         clean := 0
  179.                         if m.CleanSession {
  180.                                 clean = 1
  181.                         }
  182.                         log.Printf("New client connected from %v as %v (c%v, k%v).", c.conn.RemoteAddr(), c.clientid, clean, m.KeepAliveTimer)

  183.                 case *proto.Publish:
  184.                         // TODO: Proper QoS support
  185.                         if m.Header.QosLevel != proto.QosAtMostOnce {
  186.                                 log.Printf("reader: no support for QoS %v yet", m.Header.QosLevel)
  187.                                 return
  188.                         }
  189.                         if m.Header.QosLevel != proto.QosAtMostOnce && m.MessageId == 0 {
  190.                                 // Invalid message ID. See MQTT-2.3.1-1.
  191.                                 log.Printf("reader: invalid MessageId in PUBLISH.")
  192.                                 return
  193.                         }
  194.                         if isWildcard(m.TopicName) {
  195.                                 log.Print("reader: ignoring PUBLISH with wildcard topic ", m.TopicName)
  196.                         } else {
  197.                                 c.svr.subs.submit(c, m)
  198.                         }
  199.                         c.submit(&proto.PubAck{MessageId: m.MessageId})

  200.                 case *proto.PingReq:
  201.                         c.submit(&proto.PingResp{})

  202.                 case *proto.Subscribe:
  203.                         if m.Header.QosLevel != proto.QosAtLeastOnce {
  204.                                 // protocol error, disconnect
  205.                                 return
  206.                         }
  207.                         if m.MessageId == 0 {
  208.                                 // Invalid message ID. See MQTT-2.3.1-1.
  209.                                 log.Printf("reader: invalid MessageId in SUBSCRIBE.")
  210.                                 return
  211.                         }
  212.                         suback := &proto.SubAck{
  213.                                 MessageId: m.MessageId,
  214.                                 TopicsQos: make([]proto.QosLevel, len(m.Topics)),
  215.                         }
  216.                         for i, tq := range m.Topics {
  217.                                 // TODO: Handle varying QoS correctly
  218.                                 c.svr.subs.add(tq.Topic, c)
  219.                                 suback.TopicsQos[i] = proto.QosAtMostOnce
  220.                         }
  221.                         c.submit(suback)

  222.                         // Process retained messages.
  223.                         for _, tq := range m.Topics {
  224.                                 c.svr.subs.sendRetain(tq.Topic, c)
  225.                         }

  226.                 case *proto.Unsubscribe:
  227.                         if m.Header.QosLevel != proto.QosAtMostOnce && m.MessageId == 0 {
  228.                                 // Invalid message ID. See MQTT-2.3.1-1.
  229.                                 log.Printf("reader: invalid MessageId in UNSUBSCRIBE.")
  230.                                 return
  231.                         }
  232.                         for _, t := range m.Topics {
  233.                                 c.svr.subs.unsub(t, c)
  234.                         }
  235.                         ack := &proto.UnsubAck{MessageId: m.MessageId}
  236.                         c.submit(ack)

  237.                 case *proto.Disconnect:
  238.                         return

  239.                 default:
  240.                         log.Printf("reader: unknown msg type %T", m)
  241.                         return
  242.                 }
  243.         }
  244. }

  245. func (c *incomingConn) writer() {

  246.         // Close connection on exit in order to cause reader to exit.
  247.         defer func() {
  248.                 c.conn.Close()
  249.                 c.del()
  250.                 c.svr.subs.unsubAll(c)
  251.         }()

  252.         for job := range c.jobs {
  253.                 if c.svr.Dump {
  254.                         log.Printf("dump out: %T", job.m)
  255.                 }

  256.                 // TODO: write timeout
  257.                 err := job.m.Encode(c.conn)
  258.                 if job.r != nil {
  259.                         // notifiy the sender that this message is sent
  260.                         close(job.r)
  261.                 }
  262.                 if err != nil {
  263.                         // This one is not interesting; it happens when clients
  264.                         // disappear before we send their acks.
  265.                         oe, isoe := err.(*net.OpError)
  266.                         if isoe && oe.Err.Error() == "use of closed network connection" {
  267.                                 return
  268.                         }
  269.                         // In Go < 1.5, the error is not an OpError.
  270.                         if err.Error() == "use of closed network connection" {
  271.                                 return
  272.                         }

  273.                         log.Print("writer: ", err)
  274.                         return
  275.                 }
  276.                 c.svr.stats.messageSend()

  277.                 if _, ok := job.m.(*proto.Disconnect); ok {
  278.                         log.Print("writer: sent disconnect message")
  279.                         return
  280.                 }
  281.         }
  282. }

  283. // header is used to initialize a proto.Header when the zero value
  284. // is not correct. The zero value of proto.Header is
  285. // the equivalent of header(dupFalse, proto.QosAtMostOnce, retainFalse)
  286. // and is correct for most messages.
  287. func header(d dupFlag, q proto.QosLevel, r retainFlag) proto.Header {
  288.         return proto.Header{
  289.                 DupFlag: bool(d), QosLevel: q, Retain: bool(r),
  290.         }
  291. }

  292. type retainFlag bool
  293. type dupFlag bool

  294. const (
  295.         retainFalse retainFlag = false
  296.         retainTrue             = true
  297.         dupFalse    dupFlag    = false
  298.         dupTrue                = true
  299. )

  300. func isWildcard(topic string) bool {
  301.         if strings.Contains(topic, "#") || strings.Contains(topic, "+") {
  302.                 return true
  303.         }
  304.         return false
  305. }

  306. func (w wild) valid() bool {
  307.         for i, part := range w.wild {
  308.                 // catch things like finance#
  309.                 if isWildcard(part) && len(part) != 1 {
  310.                         return false
  311.                 }
  312.                 // # can only occur as the last part
  313.                 if part == "#" && i != len(w.wild)-1 {
  314.                         return false
  315.                 }
  316.         }
  317.         return true
  318. }

  319. const clientQueueLength = 100

  320. // A ClientConn holds all the state associated with a connection
  321. // to an MQTT server. It should be allocated via NewClientConn.
  322. // Concurrent access to a ClientConn is NOT safe.
  323. type ClientConn struct {
  324.         ClientId string              // May be set before the call to Connect.
  325.         Dump     bool                // When true, dump the messages in and out.
  326.         Incoming chan *proto.Publish // Incoming messages arrive on this channel.
  327.         id       uint16              // next MessageId
  328.         out      chan job
  329.         conn     net.Conn
  330.         done     chan struct{} // This channel will be readable once a Disconnect has been successfully sent and the connection is closed.
  331.         connack  chan *proto.ConnAck
  332.         suback   chan *proto.SubAck
  333. }

  334. // NewClientConn allocates a new ClientConn.
  335. func NewClientConn(c net.Conn) *ClientConn {
  336.         cc := &ClientConn{
  337.                 conn:     c,
  338.                 id:       1,
  339.                 out:      make(chan job, clientQueueLength),
  340.                 Incoming: make(chan *proto.Publish, clientQueueLength),
  341.                 done:     make(chan struct{}),
  342.                 connack:  make(chan *proto.ConnAck),
  343.                 suback:   make(chan *proto.SubAck),
  344.         }
  345.         go cc.reader()
  346.         go cc.writer()
  347.         return cc
  348. }

  349. func (c *ClientConn) reader() {
  350.         defer func() {
  351.                 // Cause the writer to exit.
  352.                 close(c.out)
  353.                 // Cause any goroutines waiting on messages to arrive to exit.
  354.                 close(c.Incoming)
  355.                 c.conn.Close()
  356.         }()

  357.         for {
  358.                 // TODO: timeout (first message and/or keepalives)
  359.                 m, err := proto.DecodeOneMessage(c.conn, nil)
  360.                 if err != nil {
  361.                         if err == io.EOF {
  362.                                 return
  363.                         }
  364.                         if strings.HasSuffix(err.Error(), "use of closed network connection") {
  365.                                 return
  366.                         }
  367.                         log.Print("cli reader: ", err)
  368.                         return
  369.                 }

  370.                 if c.Dump {
  371.                         log.Printf("dump  in: %T", m)
  372.                 }

  373.                 switch m := m.(type) {
  374.                 case *proto.Publish:
  375.                         c.Incoming <- m
  376.                 case *proto.PubAck:
  377.                         // ignore these
  378.                         continue
  379.                 case *proto.ConnAck:
  380.                         c.connack <- m
  381.                 case *proto.SubAck:
  382.                         c.suback <- m
  383.                 case *proto.Disconnect:
  384.                         return
  385.                 default:
  386.                         log.Printf("cli reader: got msg type %T", m)
  387.                 }
  388.         }
  389. }

  390. func (c *ClientConn) writer() {
  391.         // Close connection on exit in order to cause reader to exit.
  392.         defer func() {
  393.                 // Signal to Disconnect() that the message is on its way, or
  394.                 // that the connection is closing one way or the other...
  395.                 close(c.done)
  396.         }()

  397.         for job := range c.out {
  398.                 if c.Dump {
  399.                         log.Printf("dump out: %T", job.m)
  400.                 }

  401.                 // TODO: write timeout
  402.                 err := job.m.Encode(c.conn)
  403.                 if job.r != nil {
  404.                         close(job.r)
  405.                 }

  406.                 if err != nil {
  407.                         log.Print("cli writer: ", err)
  408.                         return
  409.                 }

  410.                 if _, ok := job.m.(*proto.Disconnect); ok {
  411.                         return
  412.                 }
  413.         }
  414. }

  415. // Connect sends the CONNECT message to the server. If the ClientId is not already
  416. // set, use a default (a 63-bit decimal random number). The "clean session"
  417. // bit is always set.
  418. func (c *ClientConn) Connect(user, pass string) error {
  419.         // TODO: Keepalive timer
  420.         if c.ClientId == "" {
  421.                 c.ClientId = fmt.Sprint(cliRand.Int63())
  422.         }
  423.         req := &proto.Connect{
  424.                 ProtocolName:    "MQIsdp",
  425.                 ProtocolVersion: 3,
  426.                 ClientId:        c.ClientId,
  427.                 CleanSession:    true,
  428.         }
  429.         if user != "" {
  430.                 req.UsernameFlag = true
  431.                 req.PasswordFlag = true
  432.                 req.Username = user
  433.                 req.Password = pass
  434.         }

  435.         c.sync(req)
  436.         ack := <-c.connack
  437.         return ConnectionErrors[ack.ReturnCode]
  438. }

  439. // ConnectionErrors is an array of errors corresponding to the
  440. // Connect return codes specified in the specification.
  441. var ConnectionErrors = [6]error{
  442.         nil, // Connection Accepted (not an error)
  443.         errors.New("Connection Refused: unacceptable protocol version"),
  444.         errors.New("Connection Refused: identifier rejected"),
  445.         errors.New("Connection Refused: server unavailable"),
  446.         errors.New("Connection Refused: bad user name or password"),
  447.         errors.New("Connection Refused: not authorized"),
  448. }

  449. // Disconnect sends a DISCONNECT message to the server. This function
  450. // blocks until the disconnect message is actually sent, and the connection
  451. // is closed.
  452. func (c *ClientConn) Disconnect() {
  453.         c.sync(&proto.Disconnect{})
  454.         <-c.done
  455. }

  456. func (c *ClientConn) nextid() uint16 {
  457.         id := c.id
  458.         c.id++
  459.         return id
  460. }

  461. // Subscribe subscribes this connection to a list of topics. Messages
  462. // will be delivered on the Incoming channel.
  463. func (c *ClientConn) Subscribe(tqs []proto.TopicQos) *proto.SubAck {
  464.         c.sync(&proto.Subscribe{
  465.                 Header:    header(dupFalse, proto.QosAtLeastOnce, retainFalse),
  466.                 MessageId: c.nextid(),
  467.                 Topics:    tqs,
  468.         })
  469.         ack := <-c.suback
  470.         return ack
  471. }

  472. // Publish publishes the given message to the MQTT server.
  473. // The QosLevel of the message must be QosAtLeastOnce for now.
  474. func (c *ClientConn) Publish(m *proto.Publish) {
  475.         if m.QosLevel != proto.QosAtMostOnce {
  476.                 panic("unsupported QoS level")
  477.         }
  478.         m.MessageId = c.nextid()
  479.         c.out <- job{m: m}
  480. }

  481. // sync sends a message and blocks until it was actually sent.
  482. func (c *ClientConn) sync(m proto.Message) {
  483.         j := job{m: m, r: make(receipt)}
  484.         c.out <- j
  485.         <-j.r
  486.         return
  487. }
复制代码

使用特权

评论回复
 楼主 | 2017-3-7 11:00 | 显示全部楼层
keer_zu 发表于 2017-3-7 10:57
文件太长,下面是钱368行:

// Package mqtt implements MQTT clients and servers.

记下来就一起读一下这部分代码

使用特权

评论回复
 楼主 | 2017-3-7 11:55 | 显示全部楼层
keer_zu 发表于 2017-3-7 10:57
文件太长,下面是钱368行:

第23行 var cliRand *rand.Rand 声明一个Rand类型的指针变量cliRand;

第25行 func init() 也就输入(Read)参数到sb,然后算出一个seed,最后将输入seed创建的Rand赋值给cliRand

第35行 type stats struct 是个一组统计值。后面几个func定义了它的几个方法。应该是统计上述值的。


使用特权

评论回复
 楼主 | 2017-3-7 12:07 | 显示全部楼层
renxiaolin 发表于 2017-3-7 10:57
慢慢玩吧,有你爽的时候

使用特权

评论回复
| 2017-3-7 12:15 | 显示全部楼层
这个代码我没仔细看过,并且mqtt服务器的代码也不用自己写,客户端会用api函数就OK

使用特权

评论回复
 楼主 | 2017-3-7 13:20 | 显示全部楼层
renxiaolin 发表于 2017-3-7 12:15
这个代码我没仔细看过,并且mqtt服务器的代码也不用自己写,客户端会用api函数就OK ...

如果要做个服务器呢?

使用特权

评论回复
| 2017-3-7 13:27 | 显示全部楼层
keer_zu 发表于 2017-3-7 13:20
如果要做个服务器呢?

你确认你自己写的比人家反复锤炼的好?我不需要自己写,我只需要把人家服务器的代码跑到自己的机器就OK啦

使用特权

评论回复
 楼主 | 2017-3-7 13:58 | 显示全部楼层
renxiaolin 发表于 2017-3-7 13:27
你确认你自己写的比人家反复锤炼的好?我不需要自己写,我只需要把人家服务器的代码跑到自己的机器就OK ...

我不会做和他们一样的东西的,但我的比他们更实用。

使用特权

评论回复
| 2017-3-7 13:59 | 显示全部楼层
keer_zu 发表于 2017-3-7 13:58
我不会做和他们一样的东西的,但我的比他们更实用。

使用特权

评论回复
 楼主 | 2017-3-7 14:02 | 显示全部楼层

必须的,否则这样做有什么意义。

使用特权

评论回复
| 2017-3-7 14:04 | 显示全部楼层
keer_zu 发表于 2017-3-7 14:02
必须的,否则这样做有什么意义。

我以为你在学习,看来是小看你啦

使用特权

评论回复
 楼主 | 2017-3-7 14:08 | 显示全部楼层
renxiaolin 发表于 2017-3-7 14:04
我以为你在学习,看来是小看你啦

实践就是做好的学习,等学完了也过时了。

使用特权

评论回复
 楼主 | 2017-3-7 14:08 | 显示全部楼层
renxiaolin 发表于 2017-3-7 14:04
我以为你在学习,看来是小看你啦

还有,物联网是复杂的,决定了短时间没有哪家能一统江湖,人人都有参与的机会。

使用特权

评论回复
 楼主 | 2017-3-7 14:39 | 显示全部楼层
keer_zu 发表于 2017-3-7 10:57
文件太长,下面是钱368行:

第48行 func statsMessage(topic string, stat int64) *proto.Publish  
    * 这里引用到了另外一个github项目:github.com/huin/mqtt  /huin/mqtt分析

使用特权

评论回复
| 2017-3-7 15:17 | 显示全部楼层
keer_zu 发表于 2017-3-7 14:39
第48行 func statsMessage(topic string, stat int64) *proto.Publish  
    * 这里引用到了另外一个gith ...

再摩拜一眼,仍是不明觉厉

使用特权

评论回复
 楼主 | 2017-3-7 15:21 | 显示全部楼层
yyy71cj 发表于 2017-3-7 15:17
再摩拜一眼,仍是不明觉厉

其实就是go语言和mqtt协议啦。

使用特权

评论回复
扫描二维码,随时随地手机跟帖
您需要登录后才可以回帖 登录 | 注册

本版积分规则

我要发帖 投诉建议 创建版块 申请版主

快速回复

您需要登录后才可以回帖
登录 | 注册
高级模式

论坛热帖

关闭

热门推荐上一条 /2 下一条

在线客服 快速回复 返回顶部 返回列表