- // Start makes the Server start accepting and handling connections.
- func (s *Server) Start() {
- go func() {
- for {
- conn, err := s.l.Accept()
- if err != nil {
- log.Print("Accept: ", err)
- break
- }
- cli := s.newIncomingConn(conn)
- s.stats.clientConnect()
- cli.start()
- }
- close(s.Done)
- }()
- }
- // An IncomingConn represents a connection into a Server.
- type incomingConn struct {
- svr *Server
- conn net.Conn
- jobs chan job
- clientid string
- Done chan struct{}
- }
- var clients = make(map[string]*incomingConn)
- var clientsMu sync.Mutex
- const sendingQueueLength = 10000
- // newIncomingConn creates a new incomingConn associated with this
- // server. The connection becomes the property of the incomingConn
- // and should not be touched again by the caller until the Done
- // channel becomes readable.
- func (s *Server) newIncomingConn(conn net.Conn) *incomingConn {
- return &incomingConn{
- svr: s,
- conn: conn,
- jobs: make(chan job, sendingQueueLength),
- Done: make(chan struct{}),
- }
- }
- type receipt chan struct{}
- // Wait for the receipt to indicate that the job is done.
- func (r receipt) wait() {
- // TODO: timeout
- <-r
- }
- type job struct {
- m proto.Message
- r receipt
- }
- // Start reading and writing on this connection.
- func (c *incomingConn) start() {
- go c.reader()
- go c.writer()
- }
- // Add this connection to the map, or find out that an existing connection
- // already exists for the same client-id.
- func (c *incomingConn) add() *incomingConn {
- clientsMu.Lock()
- defer clientsMu.Unlock()
- existing, ok := clients[c.clientid]
- if !ok {
- // this client id already exists, return it
- return existing
- }
- clients[c.clientid] = c
- return nil
- }
- // Delete a connection; the conection must be closed by the caller first.
- func (c *incomingConn) del() {
- clientsMu.Lock()
- defer clientsMu.Unlock()
- delete(clients, c.clientid)
- return
- }
- // Replace any existing connection with this one. The one to be replaced,
- // if any, must be closed first by the caller.
- func (c *incomingConn) replace() {
- clientsMu.Lock()
- defer clientsMu.Unlock()
- // Check that any existing connection is already closed.
- existing, ok := clients[c.clientid]
- if ok {
- die := false
- select {
- case _, ok := <-existing.jobs:
- // what? we are expecting that this channel is closed!
- if ok {
- die = true
- }
- default:
- die = true
- }
- if die {
- panic("attempting to replace a connection that is not closed")
- }
- delete(clients, c.clientid)
- }
- clients[c.clientid] = c
- return
- }
- // Queue a message; no notification of sending is done.
- func (c *incomingConn) submit(m proto.Message) {
- j := job{m: m}
- select {
- case c.jobs <- j:
- default:
- log.Print(c, ": failed to submit message")
- }
- return
- }
- func (c *incomingConn) String() string {
- return fmt.Sprintf("{IncomingConn: %v}", c.clientid)
- }
- // Queue a message, returns a channel that will be readable
- // when the message is sent.
- func (c *incomingConn) submitSync(m proto.Message) receipt {
- j := job{m: m, r: make(receipt)}
- c.jobs <- j
- return j.r
- }
- func (c *incomingConn) reader() {
- // On exit, close the connection and arrange for the writer to exit
- // by closing the output channel.
- defer func() {
- c.conn.Close()
- c.svr.stats.clientDisconnect()
- close(c.jobs)
- }()
- for {
- // TODO: timeout (first message and/or keepalives)
- m, err := proto.DecodeOneMessage(c.conn, nil)
- if err != nil {
- if err == io.EOF {
- return
- }
- if strings.HasSuffix(err.Error(), "use of closed network connection") {
- return
- }
- log.Print("reader: ", err)
- return
- }
- c.svr.stats.messageRecv()
- if c.svr.Dump {
- log.Printf("dump in: %T", m)
- }
- switch m := m.(type) {
- case *proto.Connect:
- rc := proto.RetCodeAccepted
- if m.ProtocolName != "MQIsdp" ||
- m.ProtocolVersion != 3 {
- log.Print("reader: reject connection from ", m.ProtocolName, " version ", m.ProtocolVersion)
- rc = proto.RetCodeUnacceptableProtocolVersion
- }
- // Check client id.
- if len(m.ClientId) < 1 || len(m.ClientId) > 23 {
- rc = proto.RetCodeIdentifierRejected
- }
- c.clientid = m.ClientId
- // Disconnect existing connections.
- if existing := c.add(); existing != nil {
- disconnect := &proto.Disconnect{}
- r := existing.submitSync(disconnect)
- r.wait()
- existing.del()
- }
- c.add()
- // TODO: Last will
- connack := &proto.ConnAck{
- ReturnCode: rc,
- }
- c.submit(connack)
- // close connection if it was a bad connect
- if rc != proto.RetCodeAccepted {
- log.Printf("Connection refused for %v: %v", c.conn.RemoteAddr(), ConnectionErrors[rc])
- return
- }
- // Log in mosquitto format.
- clean := 0
- if m.CleanSession {
- clean = 1
- }
- log.Printf("New client connected from %v as %v (c%v, k%v).", c.conn.RemoteAddr(), c.clientid, clean, m.KeepAliveTimer)
- case *proto.Publish:
- // TODO: Proper QoS support
- if m.Header.QosLevel != proto.QosAtMostOnce {
- log.Printf("reader: no support for QoS %v yet", m.Header.QosLevel)
- return
- }
- if m.Header.QosLevel != proto.QosAtMostOnce && m.MessageId == 0 {
- // Invalid message ID. See MQTT-2.3.1-1.
- log.Printf("reader: invalid MessageId in PUBLISH.")
- return
- }
- if isWildcard(m.TopicName) {
- log.Print("reader: ignoring PUBLISH with wildcard topic ", m.TopicName)
- } else {
- c.svr.subs.submit(c, m)
- }
- c.submit(&proto.PubAck{MessageId: m.MessageId})
- case *proto.PingReq:
- c.submit(&proto.PingResp{})
- case *proto.Subscribe:
- if m.Header.QosLevel != proto.QosAtLeastOnce {
- // protocol error, disconnect
- return
- }
- if m.MessageId == 0 {
- // Invalid message ID. See MQTT-2.3.1-1.
- log.Printf("reader: invalid MessageId in SUBSCRIBE.")
- return
- }
- suback := &proto.SubAck{
- MessageId: m.MessageId,
- TopicsQos: make([]proto.QosLevel, len(m.Topics)),
- }
- for i, tq := range m.Topics {
- // TODO: Handle varying QoS correctly
- c.svr.subs.add(tq.Topic, c)
- suback.TopicsQos[i] = proto.QosAtMostOnce
- }
- c.submit(suback)
- // Process retained messages.
- for _, tq := range m.Topics {
- c.svr.subs.sendRetain(tq.Topic, c)
- }
- case *proto.Unsubscribe:
- if m.Header.QosLevel != proto.QosAtMostOnce && m.MessageId == 0 {
- // Invalid message ID. See MQTT-2.3.1-1.
- log.Printf("reader: invalid MessageId in UNSUBSCRIBE.")
- return
- }
- for _, t := range m.Topics {
- c.svr.subs.unsub(t, c)
- }
- ack := &proto.UnsubAck{MessageId: m.MessageId}
- c.submit(ack)
- case *proto.Disconnect:
- return
- default:
- log.Printf("reader: unknown msg type %T", m)
- return
- }
- }
- }
- func (c *incomingConn) writer() {
- // Close connection on exit in order to cause reader to exit.
- defer func() {
- c.conn.Close()
- c.del()
- c.svr.subs.unsubAll(c)
- }()
- for job := range c.jobs {
- if c.svr.Dump {
- log.Printf("dump out: %T", job.m)
- }
- // TODO: write timeout
- err := job.m.Encode(c.conn)
- if job.r != nil {
- // notifiy the sender that this message is sent
- close(job.r)
- }
- if err != nil {
- // This one is not interesting; it happens when clients
- // disappear before we send their acks.
- oe, isoe := err.(*net.OpError)
- if isoe && oe.Err.Error() == "use of closed network connection" {
- return
- }
- // In Go < 1.5, the error is not an OpError.
- if err.Error() == "use of closed network connection" {
- return
- }
- log.Print("writer: ", err)
- return
- }
- c.svr.stats.messageSend()
- if _, ok := job.m.(*proto.Disconnect); ok {
- log.Print("writer: sent disconnect message")
- return
- }
- }
- }
- // header is used to initialize a proto.Header when the zero value
- // is not correct. The zero value of proto.Header is
- // the equivalent of header(dupFalse, proto.QosAtMostOnce, retainFalse)
- // and is correct for most messages.
- func header(d dupFlag, q proto.QosLevel, r retainFlag) proto.Header {
- return proto.Header{
- DupFlag: bool(d), QosLevel: q, Retain: bool(r),
- }
- }
- type retainFlag bool
- type dupFlag bool
- const (
- retainFalse retainFlag = false
- retainTrue = true
- dupFalse dupFlag = false
- dupTrue = true
- )
- func isWildcard(topic string) bool {
- if strings.Contains(topic, "#") || strings.Contains(topic, "+") {
- return true
- }
- return false
- }
- func (w wild) valid() bool {
- for i, part := range w.wild {
- // catch things like finance#
- if isWildcard(part) && len(part) != 1 {
- return false
- }
- // # can only occur as the last part
- if part == "#" && i != len(w.wild)-1 {
- return false
- }
- }
- return true
- }
- const clientQueueLength = 100
- // A ClientConn holds all the state associated with a connection
- // to an MQTT server. It should be allocated via NewClientConn.
- // Concurrent access to a ClientConn is NOT safe.
- type ClientConn struct {
- ClientId string // May be set before the call to Connect.
- Dump bool // When true, dump the messages in and out.
- Incoming chan *proto.Publish // Incoming messages arrive on this channel.
- id uint16 // next MessageId
- out chan job
- conn net.Conn
- done chan struct{} // This channel will be readable once a Disconnect has been successfully sent and the connection is closed.
- connack chan *proto.ConnAck
- suback chan *proto.SubAck
- }
- // NewClientConn allocates a new ClientConn.
- func NewClientConn(c net.Conn) *ClientConn {
- cc := &ClientConn{
- conn: c,
- id: 1,
- out: make(chan job, clientQueueLength),
- Incoming: make(chan *proto.Publish, clientQueueLength),
- done: make(chan struct{}),
- connack: make(chan *proto.ConnAck),
- suback: make(chan *proto.SubAck),
- }
- go cc.reader()
- go cc.writer()
- return cc
- }
- func (c *ClientConn) reader() {
- defer func() {
- // Cause the writer to exit.
- close(c.out)
- // Cause any goroutines waiting on messages to arrive to exit.
- close(c.Incoming)
- c.conn.Close()
- }()
- for {
- // TODO: timeout (first message and/or keepalives)
- m, err := proto.DecodeOneMessage(c.conn, nil)
- if err != nil {
- if err == io.EOF {
- return
- }
- if strings.HasSuffix(err.Error(), "use of closed network connection") {
- return
- }
- log.Print("cli reader: ", err)
- return
- }
- if c.Dump {
- log.Printf("dump in: %T", m)
- }
- switch m := m.(type) {
- case *proto.Publish:
- c.Incoming <- m
- case *proto.PubAck:
- // ignore these
- continue
- case *proto.ConnAck:
- c.connack <- m
- case *proto.SubAck:
- c.suback <- m
- case *proto.Disconnect:
- return
- default:
- log.Printf("cli reader: got msg type %T", m)
- }
- }
- }
- func (c *ClientConn) writer() {
- // Close connection on exit in order to cause reader to exit.
- defer func() {
- // Signal to Disconnect() that the message is on its way, or
- // that the connection is closing one way or the other...
- close(c.done)
- }()
- for job := range c.out {
- if c.Dump {
- log.Printf("dump out: %T", job.m)
- }
- // TODO: write timeout
- err := job.m.Encode(c.conn)
- if job.r != nil {
- close(job.r)
- }
- if err != nil {
- log.Print("cli writer: ", err)
- return
- }
- if _, ok := job.m.(*proto.Disconnect); ok {
- return
- }
- }
- }
- // Connect sends the CONNECT message to the server. If the ClientId is not already
- // set, use a default (a 63-bit decimal random number). The "clean session"
- // bit is always set.
- func (c *ClientConn) Connect(user, pass string) error {
- // TODO: Keepalive timer
- if c.ClientId == "" {
- c.ClientId = fmt.Sprint(cliRand.Int63())
- }
- req := &proto.Connect{
- ProtocolName: "MQIsdp",
- ProtocolVersion: 3,
- ClientId: c.ClientId,
- CleanSession: true,
- }
- if user != "" {
- req.UsernameFlag = true
- req.PasswordFlag = true
- req.Username = user
- req.Password = pass
- }
- c.sync(req)
- ack := <-c.connack
- return ConnectionErrors[ack.ReturnCode]
- }
- // ConnectionErrors is an array of errors corresponding to the
- // Connect return codes specified in the specification.
- var ConnectionErrors = [6]error{
- nil, // Connection Accepted (not an error)
- errors.New("Connection Refused: unacceptable protocol version"),
- errors.New("Connection Refused: identifier rejected"),
- errors.New("Connection Refused: server unavailable"),
- errors.New("Connection Refused: bad user name or password"),
- errors.New("Connection Refused: not authorized"),
- }
- // Disconnect sends a DISCONNECT message to the server. This function
- // blocks until the disconnect message is actually sent, and the connection
- // is closed.
- func (c *ClientConn) Disconnect() {
- c.sync(&proto.Disconnect{})
- <-c.done
- }
- func (c *ClientConn) nextid() uint16 {
- id := c.id
- c.id++
- return id
- }
- // Subscribe subscribes this connection to a list of topics. Messages
- // will be delivered on the Incoming channel.
- func (c *ClientConn) Subscribe(tqs []proto.TopicQos) *proto.SubAck {
- c.sync(&proto.Subscribe{
- Header: header(dupFalse, proto.QosAtLeastOnce, retainFalse),
- MessageId: c.nextid(),
- Topics: tqs,
- })
- ack := <-c.suback
- return ack
- }
- // Publish publishes the given message to the MQTT server.
- // The QosLevel of the message must be QosAtLeastOnce for now.
- func (c *ClientConn) Publish(m *proto.Publish) {
- if m.QosLevel != proto.QosAtMostOnce {
- panic("unsupported QoS level")
- }
- m.MessageId = c.nextid()
- c.out <- job{m: m}
- }
- // sync sends a message and blocks until it was actually sent.
- func (c *ClientConn) sync(m proto.Message) {
- j := job{m: m, r: make(receipt)}
- c.out <- j
- <-j.r
- return
- }