- // Start makes the Server start accepting and handling connections.
 
- func (s *Server) Start() {
 
-         go func() {
 
-                 for {
 
-                         conn, err := s.l.Accept()
 
-                         if err != nil {
 
-                                 log.Print("Accept: ", err)
 
-                                 break
 
-                         }
 
 
-                         cli := s.newIncomingConn(conn)
 
-                         s.stats.clientConnect()
 
-                         cli.start()
 
-                 }
 
-                 close(s.Done)
 
-         }()
 
- }
 
 
- // An IncomingConn represents a connection into a Server.
 
- type incomingConn struct {
 
-         svr      *Server
 
-         conn     net.Conn
 
-         jobs     chan job
 
-         clientid string
 
-         Done     chan struct{}
 
- }
 
 
- var clients = make(map[string]*incomingConn)
 
- var clientsMu sync.Mutex
 
 
- const sendingQueueLength = 10000
 
 
- // newIncomingConn creates a new incomingConn associated with this
 
- // server. The connection becomes the property of the incomingConn
 
- // and should not be touched again by the caller until the Done
 
- // channel becomes readable.
 
- func (s *Server) newIncomingConn(conn net.Conn) *incomingConn {
 
-         return &incomingConn{
 
-                 svr:  s,
 
-                 conn: conn,
 
-                 jobs: make(chan job, sendingQueueLength),
 
-                 Done: make(chan struct{}),
 
-         }
 
- }
 
 
- type receipt chan struct{}
 
 
- // Wait for the receipt to indicate that the job is done.
 
- func (r receipt) wait() {
 
-         // TODO: timeout
 
-         <-r
 
- }
 
 
- type job struct {
 
-         m proto.Message
 
-         r receipt
 
- }
 
 
- // Start reading and writing on this connection.
 
- func (c *incomingConn) start() {
 
-         go c.reader()
 
-         go c.writer()
 
- }
 
 
- // Add this        connection to the map, or find out that an existing connection
 
- // already exists for the same client-id.
 
- func (c *incomingConn) add() *incomingConn {
 
-         clientsMu.Lock()
 
-         defer clientsMu.Unlock()
 
 
-         existing, ok := clients[c.clientid]
 
-         if !ok {
 
-                 // this client id already exists, return it
 
-                 return existing
 
-         }
 
 
-         clients[c.clientid] = c
 
-         return nil
 
- }
 
 
- // Delete a connection; the conection must be closed by the caller first.
 
- func (c *incomingConn) del() {
 
-         clientsMu.Lock()
 
-         defer clientsMu.Unlock()
 
-         delete(clients, c.clientid)
 
-         return
 
- }
 
 
- // Replace any existing connection with this one. The one to be replaced,
 
- // if any, must be closed first by the caller.
 
- func (c *incomingConn) replace() {
 
-         clientsMu.Lock()
 
-         defer clientsMu.Unlock()
 
 
-         // Check that any existing connection is already closed.
 
-         existing, ok := clients[c.clientid]
 
-         if ok {
 
-                 die := false
 
-                 select {
 
-                 case _, ok := <-existing.jobs:
 
-                         // what? we are expecting that this channel is closed!
 
-                         if ok {
 
-                                 die = true
 
-                         }
 
-                 default:
 
-                         die = true
 
-                 }
 
-                 if die {
 
-                         panic("attempting to replace a connection that is not closed")
 
-                 }
 
 
-                 delete(clients, c.clientid)
 
-         }
 
 
-         clients[c.clientid] = c
 
-         return
 
- }
 
 
- // Queue a message; no notification of sending is done.
 
- func (c *incomingConn) submit(m proto.Message) {
 
-         j := job{m: m}
 
-         select {
 
-         case c.jobs <- j:
 
-         default:
 
-                 log.Print(c, ": failed to submit message")
 
-         }
 
-         return
 
- }
 
 
- func (c *incomingConn) String() string {
 
-         return fmt.Sprintf("{IncomingConn: %v}", c.clientid)
 
- }
 
 
- // Queue a message, returns a channel that will be readable
 
- // when the message is sent.
 
- func (c *incomingConn) submitSync(m proto.Message) receipt {
 
-         j := job{m: m, r: make(receipt)}
 
-         c.jobs <- j
 
-         return j.r
 
- }
 
 
- func (c *incomingConn) reader() {
 
-         // On exit, close the connection and arrange for the writer to exit
 
-         // by closing the output channel.
 
-         defer func() {
 
-                 c.conn.Close()
 
-                 c.svr.stats.clientDisconnect()
 
-                 close(c.jobs)
 
-         }()
 
 
-         for {
 
-                 // TODO: timeout (first message and/or keepalives)
 
-                 m, err := proto.DecodeOneMessage(c.conn, nil)
 
-                 if err != nil {
 
-                         if err == io.EOF {
 
-                                 return
 
-                         }
 
-                         if strings.HasSuffix(err.Error(), "use of closed network connection") {
 
-                                 return
 
-                         }
 
-                         log.Print("reader: ", err)
 
-                         return
 
-                 }
 
-                 c.svr.stats.messageRecv()
 
 
-                 if c.svr.Dump {
 
-                         log.Printf("dump  in: %T", m)
 
-                 }
 
 
-                 switch m := m.(type) {
 
-                 case *proto.Connect:
 
-                         rc := proto.RetCodeAccepted
 
 
-                         if m.ProtocolName != "MQIsdp" ||
 
-                                 m.ProtocolVersion != 3 {
 
-                                 log.Print("reader: reject connection from ", m.ProtocolName, " version ", m.ProtocolVersion)
 
-                                 rc = proto.RetCodeUnacceptableProtocolVersion
 
-                         }
 
 
-                         // Check client id.
 
-                         if len(m.ClientId) < 1 || len(m.ClientId) > 23 {
 
-                                 rc = proto.RetCodeIdentifierRejected
 
-                         }
 
-                         c.clientid = m.ClientId
 
 
-                         // Disconnect existing connections.
 
-                         if existing := c.add(); existing != nil {
 
-                                 disconnect := &proto.Disconnect{}
 
-                                 r := existing.submitSync(disconnect)
 
-                                 r.wait()
 
-                                 existing.del()
 
-                         }
 
-                         c.add()
 
 
-                         // TODO: Last will
 
 
-                         connack := &proto.ConnAck{
 
-                                 ReturnCode: rc,
 
-                         }
 
-                         c.submit(connack)
 
 
-                         // close connection if it was a bad connect
 
-                         if rc != proto.RetCodeAccepted {
 
-                                 log.Printf("Connection refused for %v: %v", c.conn.RemoteAddr(), ConnectionErrors[rc])
 
-                                 return
 
-                         }
 
 
-                         // Log in mosquitto format.
 
-                         clean := 0
 
-                         if m.CleanSession {
 
-                                 clean = 1
 
-                         }
 
-                         log.Printf("New client connected from %v as %v (c%v, k%v).", c.conn.RemoteAddr(), c.clientid, clean, m.KeepAliveTimer)
 
 
-                 case *proto.Publish:
 
-                         // TODO: Proper QoS support
 
-                         if m.Header.QosLevel != proto.QosAtMostOnce {
 
-                                 log.Printf("reader: no support for QoS %v yet", m.Header.QosLevel)
 
-                                 return
 
-                         }
 
-                         if m.Header.QosLevel != proto.QosAtMostOnce && m.MessageId == 0 {
 
-                                 // Invalid message ID. See MQTT-2.3.1-1.
 
-                                 log.Printf("reader: invalid MessageId in PUBLISH.")
 
-                                 return
 
-                         }
 
-                         if isWildcard(m.TopicName) {
 
-                                 log.Print("reader: ignoring PUBLISH with wildcard topic ", m.TopicName)
 
-                         } else {
 
-                                 c.svr.subs.submit(c, m)
 
-                         }
 
-                         c.submit(&proto.PubAck{MessageId: m.MessageId})
 
 
-                 case *proto.PingReq:
 
-                         c.submit(&proto.PingResp{})
 
 
-                 case *proto.Subscribe:
 
-                         if m.Header.QosLevel != proto.QosAtLeastOnce {
 
-                                 // protocol error, disconnect
 
-                                 return
 
-                         }
 
-                         if m.MessageId == 0 {
 
-                                 // Invalid message ID. See MQTT-2.3.1-1.
 
-                                 log.Printf("reader: invalid MessageId in SUBSCRIBE.")
 
-                                 return
 
-                         }
 
-                         suback := &proto.SubAck{
 
-                                 MessageId: m.MessageId,
 
-                                 TopicsQos: make([]proto.QosLevel, len(m.Topics)),
 
-                         }
 
-                         for i, tq := range m.Topics {
 
-                                 // TODO: Handle varying QoS correctly
 
-                                 c.svr.subs.add(tq.Topic, c)
 
-                                 suback.TopicsQos[i] = proto.QosAtMostOnce
 
-                         }
 
-                         c.submit(suback)
 
 
-                         // Process retained messages.
 
-                         for _, tq := range m.Topics {
 
-                                 c.svr.subs.sendRetain(tq.Topic, c)
 
-                         }
 
 
-                 case *proto.Unsubscribe:
 
-                         if m.Header.QosLevel != proto.QosAtMostOnce && m.MessageId == 0 {
 
-                                 // Invalid message ID. See MQTT-2.3.1-1.
 
-                                 log.Printf("reader: invalid MessageId in UNSUBSCRIBE.")
 
-                                 return
 
-                         }
 
-                         for _, t := range m.Topics {
 
-                                 c.svr.subs.unsub(t, c)
 
-                         }
 
-                         ack := &proto.UnsubAck{MessageId: m.MessageId}
 
-                         c.submit(ack)
 
 
-                 case *proto.Disconnect:
 
-                         return
 
 
-                 default:
 
-                         log.Printf("reader: unknown msg type %T", m)
 
-                         return
 
-                 }
 
-         }
 
- }
 
 
- func (c *incomingConn) writer() {
 
 
-         // Close connection on exit in order to cause reader to exit.
 
-         defer func() {
 
-                 c.conn.Close()
 
-                 c.del()
 
-                 c.svr.subs.unsubAll(c)
 
-         }()
 
 
-         for job := range c.jobs {
 
-                 if c.svr.Dump {
 
-                         log.Printf("dump out: %T", job.m)
 
-                 }
 
 
-                 // TODO: write timeout
 
-                 err := job.m.Encode(c.conn)
 
-                 if job.r != nil {
 
-                         // notifiy the sender that this message is sent
 
-                         close(job.r)
 
-                 }
 
-                 if err != nil {
 
-                         // This one is not interesting; it happens when clients
 
-                         // disappear before we send their acks.
 
-                         oe, isoe := err.(*net.OpError)
 
-                         if isoe && oe.Err.Error() == "use of closed network connection" {
 
-                                 return
 
-                         }
 
-                         // In Go < 1.5, the error is not an OpError.
 
-                         if err.Error() == "use of closed network connection" {
 
-                                 return
 
-                         }
 
 
-                         log.Print("writer: ", err)
 
-                         return
 
-                 }
 
-                 c.svr.stats.messageSend()
 
 
-                 if _, ok := job.m.(*proto.Disconnect); ok {
 
-                         log.Print("writer: sent disconnect message")
 
-                         return
 
-                 }
 
-         }
 
- }
 
 
- // header is used to initialize a proto.Header when the zero value
 
- // is not correct. The zero value of proto.Header is
 
- // the equivalent of header(dupFalse, proto.QosAtMostOnce, retainFalse)
 
- // and is correct for most messages.
 
- func header(d dupFlag, q proto.QosLevel, r retainFlag) proto.Header {
 
-         return proto.Header{
 
-                 DupFlag: bool(d), QosLevel: q, Retain: bool(r),
 
-         }
 
- }
 
 
- type retainFlag bool
 
- type dupFlag bool
 
 
- const (
 
-         retainFalse retainFlag = false
 
-         retainTrue             = true
 
-         dupFalse    dupFlag    = false
 
-         dupTrue                = true
 
- )
 
 
- func isWildcard(topic string) bool {
 
-         if strings.Contains(topic, "#") || strings.Contains(topic, "+") {
 
-                 return true
 
-         }
 
-         return false
 
- }
 
 
- func (w wild) valid() bool {
 
-         for i, part := range w.wild {
 
-                 // catch things like finance#
 
-                 if isWildcard(part) && len(part) != 1 {
 
-                         return false
 
-                 }
 
-                 // # can only occur as the last part
 
-                 if part == "#" && i != len(w.wild)-1 {
 
-                         return false
 
-                 }
 
-         }
 
-         return true
 
- }
 
 
- const clientQueueLength = 100
 
 
- // A ClientConn holds all the state associated with a connection
 
- // to an MQTT server. It should be allocated via NewClientConn.
 
- // Concurrent access to a ClientConn is NOT safe.
 
- type ClientConn struct {
 
-         ClientId string              // May be set before the call to Connect.
 
-         Dump     bool                // When true, dump the messages in and out.
 
-         Incoming chan *proto.Publish // Incoming messages arrive on this channel.
 
-         id       uint16              // next MessageId
 
-         out      chan job
 
-         conn     net.Conn
 
-         done     chan struct{} // This channel will be readable once a Disconnect has been successfully sent and the connection is closed.
 
-         connack  chan *proto.ConnAck
 
-         suback   chan *proto.SubAck
 
- }
 
 
- // NewClientConn allocates a new ClientConn.
 
- func NewClientConn(c net.Conn) *ClientConn {
 
-         cc := &ClientConn{
 
-                 conn:     c,
 
-                 id:       1,
 
-                 out:      make(chan job, clientQueueLength),
 
-                 Incoming: make(chan *proto.Publish, clientQueueLength),
 
-                 done:     make(chan struct{}),
 
-                 connack:  make(chan *proto.ConnAck),
 
-                 suback:   make(chan *proto.SubAck),
 
-         }
 
-         go cc.reader()
 
-         go cc.writer()
 
-         return cc
 
- }
 
 
- func (c *ClientConn) reader() {
 
-         defer func() {
 
-                 // Cause the writer to exit.
 
-                 close(c.out)
 
-                 // Cause any goroutines waiting on messages to arrive to exit.
 
-                 close(c.Incoming)
 
-                 c.conn.Close()
 
-         }()
 
 
-         for {
 
-                 // TODO: timeout (first message and/or keepalives)
 
-                 m, err := proto.DecodeOneMessage(c.conn, nil)
 
-                 if err != nil {
 
-                         if err == io.EOF {
 
-                                 return
 
-                         }
 
-                         if strings.HasSuffix(err.Error(), "use of closed network connection") {
 
-                                 return
 
-                         }
 
-                         log.Print("cli reader: ", err)
 
-                         return
 
-                 }
 
 
-                 if c.Dump {
 
-                         log.Printf("dump  in: %T", m)
 
-                 }
 
 
-                 switch m := m.(type) {
 
-                 case *proto.Publish:
 
-                         c.Incoming <- m
 
-                 case *proto.PubAck:
 
-                         // ignore these
 
-                         continue
 
-                 case *proto.ConnAck:
 
-                         c.connack <- m
 
-                 case *proto.SubAck:
 
-                         c.suback <- m
 
-                 case *proto.Disconnect:
 
-                         return
 
-                 default:
 
-                         log.Printf("cli reader: got msg type %T", m)
 
-                 }
 
-         }
 
- }
 
 
- func (c *ClientConn) writer() {
 
-         // Close connection on exit in order to cause reader to exit.
 
-         defer func() {
 
-                 // Signal to Disconnect() that the message is on its way, or
 
-                 // that the connection is closing one way or the other...
 
-                 close(c.done)
 
-         }()
 
 
-         for job := range c.out {
 
-                 if c.Dump {
 
-                         log.Printf("dump out: %T", job.m)
 
-                 }
 
 
-                 // TODO: write timeout
 
-                 err := job.m.Encode(c.conn)
 
-                 if job.r != nil {
 
-                         close(job.r)
 
-                 }
 
 
-                 if err != nil {
 
-                         log.Print("cli writer: ", err)
 
-                         return
 
-                 }
 
 
-                 if _, ok := job.m.(*proto.Disconnect); ok {
 
-                         return
 
-                 }
 
-         }
 
- }
 
 
- // Connect sends the CONNECT message to the server. If the ClientId is not already
 
- // set, use a default (a 63-bit decimal random number). The "clean session"
 
- // bit is always set.
 
- func (c *ClientConn) Connect(user, pass string) error {
 
-         // TODO: Keepalive timer
 
-         if c.ClientId == "" {
 
-                 c.ClientId = fmt.Sprint(cliRand.Int63())
 
-         }
 
-         req := &proto.Connect{
 
-                 ProtocolName:    "MQIsdp",
 
-                 ProtocolVersion: 3,
 
-                 ClientId:        c.ClientId,
 
-                 CleanSession:    true,
 
-         }
 
-         if user != "" {
 
-                 req.UsernameFlag = true
 
-                 req.PasswordFlag = true
 
-                 req.Username = user
 
-                 req.Password = pass
 
-         }
 
 
-         c.sync(req)
 
-         ack := <-c.connack
 
-         return ConnectionErrors[ack.ReturnCode]
 
- }
 
 
- // ConnectionErrors is an array of errors corresponding to the
 
- // Connect return codes specified in the specification.
 
- var ConnectionErrors = [6]error{
 
-         nil, // Connection Accepted (not an error)
 
-         errors.New("Connection Refused: unacceptable protocol version"),
 
-         errors.New("Connection Refused: identifier rejected"),
 
-         errors.New("Connection Refused: server unavailable"),
 
-         errors.New("Connection Refused: bad user name or password"),
 
-         errors.New("Connection Refused: not authorized"),
 
- }
 
 
- // Disconnect sends a DISCONNECT message to the server. This function
 
- // blocks until the disconnect message is actually sent, and the connection
 
- // is closed.
 
- func (c *ClientConn) Disconnect() {
 
-         c.sync(&proto.Disconnect{})
 
-         <-c.done
 
- }
 
 
- func (c *ClientConn) nextid() uint16 {
 
-         id := c.id
 
-         c.id++
 
-         return id
 
- }
 
 
- // Subscribe subscribes this connection to a list of topics. Messages
 
- // will be delivered on the Incoming channel.
 
- func (c *ClientConn) Subscribe(tqs []proto.TopicQos) *proto.SubAck {
 
-         c.sync(&proto.Subscribe{
 
-                 Header:    header(dupFalse, proto.QosAtLeastOnce, retainFalse),
 
-                 MessageId: c.nextid(),
 
-                 Topics:    tqs,
 
-         })
 
-         ack := <-c.suback
 
-         return ack
 
- }
 
 
- // Publish publishes the given message to the MQTT server.
 
- // The QosLevel of the message must be QosAtLeastOnce for now.
 
- func (c *ClientConn) Publish(m *proto.Publish) {
 
-         if m.QosLevel != proto.QosAtMostOnce {
 
-                 panic("unsupported QoS level")
 
-         }
 
-         m.MessageId = c.nextid()
 
-         c.out <- job{m: m}
 
- }
 
 
- // sync sends a message and blocks until it was actually sent.
 
- func (c *ClientConn) sync(m proto.Message) {
 
-         j := job{m: m, r: make(receipt)}
 
-         c.out <- j
 
-         <-j.r
 
-         return
 
- }