// Start makes the Server start accepting and handling connections.
func (s *Server) Start() {
go func() {
for {
conn, err := s.l.Accept()
if err != nil {
log.Print("Accept: ", err)
break
}
cli := s.newIncomingConn(conn)
s.stats.clientConnect()
cli.start()
}
close(s.Done)
}()
}
// An IncomingConn represents a connection into a Server.
type incomingConn struct {
svr *Server
conn net.Conn
jobs chan job
clientid string
Done chan struct{}
}
var clients = make(map[string]*incomingConn)
var clientsMu sync.Mutex
const sendingQueueLength = 10000
// newIncomingConn creates a new incomingConn associated with this
// server. The connection becomes the property of the incomingConn
// and should not be touched again by the caller until the Done
// channel becomes readable.
func (s *Server) newIncomingConn(conn net.Conn) *incomingConn {
return &incomingConn{
svr: s,
conn: conn,
jobs: make(chan job, sendingQueueLength),
Done: make(chan struct{}),
}
}
type receipt chan struct{}
// Wait for the receipt to indicate that the job is done.
func (r receipt) wait() {
// TODO: timeout
<-r
}
type job struct {
m proto.Message
r receipt
}
// Start reading and writing on this connection.
func (c *incomingConn) start() {
go c.reader()
go c.writer()
}
// Add this connection to the map, or find out that an existing connection
// already exists for the same client-id.
func (c *incomingConn) add() *incomingConn {
clientsMu.Lock()
defer clientsMu.Unlock()
existing, ok := clients[c.clientid]
if !ok {
// this client id already exists, return it
return existing
}
clients[c.clientid] = c
return nil
}
// Delete a connection; the conection must be closed by the caller first.
func (c *incomingConn) del() {
clientsMu.Lock()
defer clientsMu.Unlock()
delete(clients, c.clientid)
return
}
// Replace any existing connection with this one. The one to be replaced,
// if any, must be closed first by the caller.
func (c *incomingConn) replace() {
clientsMu.Lock()
defer clientsMu.Unlock()
// Check that any existing connection is already closed.
existing, ok := clients[c.clientid]
if ok {
die := false
select {
case _, ok := <-existing.jobs:
// what? we are expecting that this channel is closed!
if ok {
die = true
}
default:
die = true
}
if die {
panic("attempting to replace a connection that is not closed")
}
delete(clients, c.clientid)
}
clients[c.clientid] = c
return
}
// Queue a message; no notification of sending is done.
func (c *incomingConn) submit(m proto.Message) {
j := job{m: m}
select {
case c.jobs <- j:
default:
log.Print(c, ": failed to submit message")
}
return
}
func (c *incomingConn) String() string {
return fmt.Sprintf("{IncomingConn: %v}", c.clientid)
}
// Queue a message, returns a channel that will be readable
// when the message is sent.
func (c *incomingConn) submitSync(m proto.Message) receipt {
j := job{m: m, r: make(receipt)}
c.jobs <- j
return j.r
}
func (c *incomingConn) reader() {
// On exit, close the connection and arrange for the writer to exit
// by closing the output channel.
defer func() {
c.conn.Close()
c.svr.stats.clientDisconnect()
close(c.jobs)
}()
for {
// TODO: timeout (first message and/or keepalives)
m, err := proto.DecodeOneMessage(c.conn, nil)
if err != nil {
if err == io.EOF {
return
}
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
log.Print("reader: ", err)
return
}
c.svr.stats.messageRecv()
if c.svr.Dump {
log.Printf("dump in: %T", m)
}
switch m := m.(type) {
case *proto.Connect:
rc := proto.RetCodeAccepted
if m.ProtocolName != "MQIsdp" ||
m.ProtocolVersion != 3 {
log.Print("reader: reject connection from ", m.ProtocolName, " version ", m.ProtocolVersion)
rc = proto.RetCodeUnacceptableProtocolVersion
}
// Check client id.
if len(m.ClientId) < 1 || len(m.ClientId) > 23 {
rc = proto.RetCodeIdentifierRejected
}
c.clientid = m.ClientId
// Disconnect existing connections.
if existing := c.add(); existing != nil {
disconnect := &proto.Disconnect{}
r := existing.submitSync(disconnect)
r.wait()
existing.del()
}
c.add()
// TODO: Last will
connack := &proto.ConnAck{
ReturnCode: rc,
}
c.submit(connack)
// close connection if it was a bad connect
if rc != proto.RetCodeAccepted {
log.Printf("Connection refused for %v: %v", c.conn.RemoteAddr(), ConnectionErrors[rc])
return
}
// Log in mosquitto format.
clean := 0
if m.CleanSession {
clean = 1
}
log.Printf("New client connected from %v as %v (c%v, k%v).", c.conn.RemoteAddr(), c.clientid, clean, m.KeepAliveTimer)
case *proto.Publish:
// TODO: Proper QoS support
if m.Header.QosLevel != proto.QosAtMostOnce {
log.Printf("reader: no support for QoS %v yet", m.Header.QosLevel)
return
}
if m.Header.QosLevel != proto.QosAtMostOnce && m.MessageId == 0 {
// Invalid message ID. See MQTT-2.3.1-1.
log.Printf("reader: invalid MessageId in PUBLISH.")
return
}
if isWildcard(m.TopicName) {
log.Print("reader: ignoring PUBLISH with wildcard topic ", m.TopicName)
} else {
c.svr.subs.submit(c, m)
}
c.submit(&proto.PubAck{MessageId: m.MessageId})
case *proto.PingReq:
c.submit(&proto.PingResp{})
case *proto.Subscribe:
if m.Header.QosLevel != proto.QosAtLeastOnce {
// protocol error, disconnect
return
}
if m.MessageId == 0 {
// Invalid message ID. See MQTT-2.3.1-1.
log.Printf("reader: invalid MessageId in SUBSCRIBE.")
return
}
suback := &proto.SubAck{
MessageId: m.MessageId,
TopicsQos: make([]proto.QosLevel, len(m.Topics)),
}
for i, tq := range m.Topics {
// TODO: Handle varying QoS correctly
c.svr.subs.add(tq.Topic, c)
suback.TopicsQos[i] = proto.QosAtMostOnce
}
c.submit(suback)
// Process retained messages.
for _, tq := range m.Topics {
c.svr.subs.sendRetain(tq.Topic, c)
}
case *proto.Unsubscribe:
if m.Header.QosLevel != proto.QosAtMostOnce && m.MessageId == 0 {
// Invalid message ID. See MQTT-2.3.1-1.
log.Printf("reader: invalid MessageId in UNSUBSCRIBE.")
return
}
for _, t := range m.Topics {
c.svr.subs.unsub(t, c)
}
ack := &proto.UnsubAck{MessageId: m.MessageId}
c.submit(ack)
case *proto.Disconnect:
return
default:
log.Printf("reader: unknown msg type %T", m)
return
}
}
}
func (c *incomingConn) writer() {
// Close connection on exit in order to cause reader to exit.
defer func() {
c.conn.Close()
c.del()
c.svr.subs.unsubAll(c)
}()
for job := range c.jobs {
if c.svr.Dump {
log.Printf("dump out: %T", job.m)
}
// TODO: write timeout
err := job.m.Encode(c.conn)
if job.r != nil {
// notifiy the sender that this message is sent
close(job.r)
}
if err != nil {
// This one is not interesting; it happens when clients
// disappear before we send their acks.
oe, isoe := err.(*net.OpError)
if isoe && oe.Err.Error() == "use of closed network connection" {
return
}
// In Go < 1.5, the error is not an OpError.
if err.Error() == "use of closed network connection" {
return
}
log.Print("writer: ", err)
return
}
c.svr.stats.messageSend()
if _, ok := job.m.(*proto.Disconnect); ok {
log.Print("writer: sent disconnect message")
return
}
}
}
// header is used to initialize a proto.Header when the zero value
// is not correct. The zero value of proto.Header is
// the equivalent of header(dupFalse, proto.QosAtMostOnce, retainFalse)
// and is correct for most messages.
func header(d dupFlag, q proto.QosLevel, r retainFlag) proto.Header {
return proto.Header{
DupFlag: bool(d), QosLevel: q, Retain: bool(r),
}
}
type retainFlag bool
type dupFlag bool
const (
retainFalse retainFlag = false
retainTrue = true
dupFalse dupFlag = false
dupTrue = true
)
func isWildcard(topic string) bool {
if strings.Contains(topic, "#") || strings.Contains(topic, "+") {
return true
}
return false
}
func (w wild) valid() bool {
for i, part := range w.wild {
// catch things like finance#
if isWildcard(part) && len(part) != 1 {
return false
}
// # can only occur as the last part
if part == "#" && i != len(w.wild)-1 {
return false
}
}
return true
}
const clientQueueLength = 100
// A ClientConn holds all the state associated with a connection
// to an MQTT server. It should be allocated via NewClientConn.
// Concurrent access to a ClientConn is NOT safe.
type ClientConn struct {
ClientId string // May be set before the call to Connect.
Dump bool // When true, dump the messages in and out.
Incoming chan *proto.Publish // Incoming messages arrive on this channel.
id uint16 // next MessageId
out chan job
conn net.Conn
done chan struct{} // This channel will be readable once a Disconnect has been successfully sent and the connection is closed.
connack chan *proto.ConnAck
suback chan *proto.SubAck
}
// NewClientConn allocates a new ClientConn.
func NewClientConn(c net.Conn) *ClientConn {
cc := &ClientConn{
conn: c,
id: 1,
out: make(chan job, clientQueueLength),
Incoming: make(chan *proto.Publish, clientQueueLength),
done: make(chan struct{}),
connack: make(chan *proto.ConnAck),
suback: make(chan *proto.SubAck),
}
go cc.reader()
go cc.writer()
return cc
}
func (c *ClientConn) reader() {
defer func() {
// Cause the writer to exit.
close(c.out)
// Cause any goroutines waiting on messages to arrive to exit.
close(c.Incoming)
c.conn.Close()
}()
for {
// TODO: timeout (first message and/or keepalives)
m, err := proto.DecodeOneMessage(c.conn, nil)
if err != nil {
if err == io.EOF {
return
}
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
log.Print("cli reader: ", err)
return
}
if c.Dump {
log.Printf("dump in: %T", m)
}
switch m := m.(type) {
case *proto.Publish:
c.Incoming <- m
case *proto.PubAck:
// ignore these
continue
case *proto.ConnAck:
c.connack <- m
case *proto.SubAck:
c.suback <- m
case *proto.Disconnect:
return
default:
log.Printf("cli reader: got msg type %T", m)
}
}
}
func (c *ClientConn) writer() {
// Close connection on exit in order to cause reader to exit.
defer func() {
// Signal to Disconnect() that the message is on its way, or
// that the connection is closing one way or the other...
close(c.done)
}()
for job := range c.out {
if c.Dump {
log.Printf("dump out: %T", job.m)
}
// TODO: write timeout
err := job.m.Encode(c.conn)
if job.r != nil {
close(job.r)
}
if err != nil {
log.Print("cli writer: ", err)
return
}
if _, ok := job.m.(*proto.Disconnect); ok {
return
}
}
}
// Connect sends the CONNECT message to the server. If the ClientId is not already
// set, use a default (a 63-bit decimal random number). The "clean session"
// bit is always set.
func (c *ClientConn) Connect(user, pass string) error {
// TODO: Keepalive timer
if c.ClientId == "" {
c.ClientId = fmt.Sprint(cliRand.Int63())
}
req := &proto.Connect{
ProtocolName: "MQIsdp",
ProtocolVersion: 3,
ClientId: c.ClientId,
CleanSession: true,
}
if user != "" {
req.UsernameFlag = true
req.PasswordFlag = true
req.Username = user
req.Password = pass
}
c.sync(req)
ack := <-c.connack
return ConnectionErrors[ack.ReturnCode]
}
// ConnectionErrors is an array of errors corresponding to the
// Connect return codes specified in the specification.
var ConnectionErrors = [6]error{
nil, // Connection Accepted (not an error)
errors.New("Connection Refused: unacceptable protocol version"),
errors.New("Connection Refused: identifier rejected"),
errors.New("Connection Refused: server unavailable"),
errors.New("Connection Refused: bad user name or password"),
errors.New("Connection Refused: not authorized"),
}
// Disconnect sends a DISCONNECT message to the server. This function
// blocks until the disconnect message is actually sent, and the connection
// is closed.
func (c *ClientConn) Disconnect() {
c.sync(&proto.Disconnect{})
<-c.done
}
func (c *ClientConn) nextid() uint16 {
id := c.id
c.id++
return id
}
// Subscribe subscribes this connection to a list of topics. Messages
// will be delivered on the Incoming channel.
func (c *ClientConn) Subscribe(tqs []proto.TopicQos) *proto.SubAck {
c.sync(&proto.Subscribe{
Header: header(dupFalse, proto.QosAtLeastOnce, retainFalse),
MessageId: c.nextid(),
Topics: tqs,
})
ack := <-c.suback
return ack
}
// Publish publishes the given message to the MQTT server.
// The QosLevel of the message must be QosAtLeastOnce for now.
func (c *ClientConn) Publish(m *proto.Publish) {
if m.QosLevel != proto.QosAtMostOnce {
panic("unsupported QoS level")
}
m.MessageId = c.nextid()
c.out <- job{m: m}
}
// sync sends a message and blocks until it was actually sent.
func (c *ClientConn) sync(m proto.Message) {
j := job{m: m, r: make(receipt)}
c.out <- j
<-j.r
return
}