一个github上用go实现dht网络的例子

[复制链接]
 楼主| keer_zu 发表于 2016-12-6 14:58 | 显示全部楼层 |阅读模式
dht网络作为一种没有中心服务器的网络,是对等网络,p2p(peer to peer)的一种典型网络。实现算法很多,比如kademlia


代码(部分):
  1. package dht

  2. import (
  3.         "bytes"
  4.         "crypto/rand"
  5.         "encoding/hex"
  6.         "expvar"
  7.         "fmt"
  8.         "net"
  9.         "strconv"
  10.         "time"

  11.         log "github.com/golang/glog"
  12.         bencode "github.com/jackpal/bencode-go"
  13.         "github.com/nictuku/nettools"
  14. )

  15. // Search a node again after some time.
  16. var searchRetryPeriod = 15 * time.Second

  17. // Owned by the DHT engine.
  18. type remoteNode struct {
  19.         address net.UDPAddr
  20.         // addressDotFormatted contains a binary representation of the node's host:port address.
  21.         addressBinaryFormat string
  22.         id                  string
  23.         // lastQueryID should be incremented after consumed. Based on the
  24.         // protocol, it would be two letters, but I'm using 0-255, although
  25.         // treated as string.
  26.         lastQueryID int
  27.         // TODO: key by infohash instead?
  28.         pendingQueries   map[string]*queryType // key: transaction ID
  29.         pastQueries      map[string]*queryType // key: transaction ID
  30.         reachable        bool
  31.         lastResponseTime time.Time
  32.         lastSearchTime   time.Time
  33.         ActiveDownloads  []string // List of infohashes we know this peer is downloading.
  34. }

  35. func newRemoteNode(addr net.UDPAddr, id string) *remoteNode {
  36.         return &remoteNode{
  37.                 address:             addr,
  38.                 addressBinaryFormat: nettools.DottedPortToBinary(addr.String()),
  39.                 lastQueryID:         newTransactionId(),
  40.                 id:                  id,
  41.                 reachable:           false,
  42.                 pendingQueries:      map[string]*queryType{},
  43.                 pastQueries:         map[string]*queryType{},
  44.         }
  45. }

  46. type queryType struct {
  47.         Type    string
  48.         ih      InfoHash
  49.         srcNode string
  50. }

  51. const (
  52.         // Once in a while I get a few bigger ones, but meh.
  53.         maxUDPPacketSize = 4096
  54.         v4nodeContactLen = 26
  55.         v6nodeContactLen = 38 // some clients seem to send multiples of 38
  56.         nodeIdLen        = 20
  57. )

  58. var (
  59.         totalSent         = expvar.NewInt("totalSent")
  60.         totalReadBytes    = expvar.NewInt("totalReadBytes")
  61.         totalWrittenBytes = expvar.NewInt("totalWrittenBytes")
  62. )

  63. // The 'nodes' response is a string with fixed length contacts concatenated arbitrarily.
  64. func parseNodesString(nodes string, proto string) (parsed map[string]string) {
  65.         var nodeContactLen int
  66.         if proto == "udp4" {
  67.                 nodeContactLen = v4nodeContactLen
  68.         } else if proto == "udp6" {
  69.                 nodeContactLen = v6nodeContactLen
  70.         } else {
  71.                 return
  72.         }
  73.         parsed = make(map[string]string)
  74.         if len(nodes)%nodeContactLen > 0 {
  75.                 log.V(3).Infof("DHT: len(NodeString) = %d, INVALID LENGTH, should be a multiple of %d", len(nodes), nodeContactLen)
  76.                 log.V(5).Infof("%T %#v\n", nodes, nodes)
  77.                 return
  78.         } else {
  79.                 log.V(5).Infof("DHT: len(NodeString) = %d, had %d nodes, nodeContactLen=%d\n", len(nodes), len(nodes)/nodeContactLen, nodeContactLen)
  80.         }
  81.         for i := 0; i < len(nodes); i += nodeContactLen {
  82.                 id := nodes[i : i+nodeIdLen]
  83.                 address := nettools.BinaryToDottedPort(nodes[i+nodeIdLen : i+nodeContactLen])
  84.                 parsed[id] = address
  85.         }
  86.         return

  87. }

  88. // newQuery creates a new transaction id and adds an entry to r.pendingQueries.
  89. // It does not set any extra information to the transaction information, so the
  90. // caller must take care of that.
  91. func (r *remoteNode) newQuery(transType string) (transId string) {
  92.         log.V(4).Infof("newQuery for %x, lastID %v", r.id, r.lastQueryID)
  93.         r.lastQueryID = (r.lastQueryID + 1) % 256
  94.         transId = strconv.Itoa(r.lastQueryID)
  95.         log.V(4).Infof("... new id %v", r.lastQueryID)
  96.         r.pendingQueries[transId] = &queryType{Type: transType}
  97.         return
  98. }

  99. // wasContactedRecently returns true if a node was contacted recently _and_
  100. // one of the recent queries (not necessarily the last) was about the ih. If
  101. // the ih is different at each time, it will keep returning false.
  102. func (r *remoteNode) wasContactedRecently(ih InfoHash) bool {
  103.         if len(r.pendingQueries) == 0 && len(r.pastQueries) == 0 {
  104.                 return false
  105.         }
  106.         if !r.lastResponseTime.IsZero() && time.Since(r.lastResponseTime) > searchRetryPeriod {
  107.                 return false
  108.         }
  109.         for _, q := range r.pendingQueries {
  110.                 if q.ih == ih {
  111.                         return true
  112.                 }
  113.         }
  114.         if !r.lastSearchTime.IsZero() && time.Since(r.lastSearchTime) > searchRetryPeriod {
  115.                 return false
  116.         }
  117.         for _, q := range r.pastQueries {
  118.                 if q.ih == ih {
  119.                         return true
  120.                 }
  121.         }
  122.         return false
  123. }

  124. type getPeersResponse struct {
  125.         // TODO: argh, values can be a string depending on the client (e.g: original bittorrent).
  126.         Values []string "values"
  127.         Id     string   "id"
  128.         Nodes  string   "nodes"
  129.         Nodes6 string   "nodes6"
  130.         Token  string   "token"
  131. }

  132. type answerType struct {
  133.         Id       string   "id"
  134.         Target   string   "target"
  135.         InfoHash InfoHash "info_hash" // should probably be a string.
  136.         Port     int      "port"
  137.         Token    string   "token"
  138. }

  139. // Generic stuff we read from the wire, not knowing what it is. This is as generic as can be.
  140. type responseType struct {
  141.         T string           "t"
  142.         Y string           "y"
  143.         Q string           "q"
  144.         R getPeersResponse "r"
  145.         E []string         "e"
  146.         A answerType       "a"
  147.         // Unsupported mainline extension for client identification.
  148.         // V string(?)        "v"
  149. }

  150. // sendMsg bencodes the data in 'query' and sends it to the remote node.
  151. func sendMsg(conn *net.UDPConn, raddr net.UDPAddr, query interface{}) {
  152.         totalSent.Add(1)
  153.         var b bytes.Buffer
  154.         if err := bencode.Marshal(&b, query); err != nil {
  155.                 return
  156.         }
  157.         if n, err := conn.WriteToUDP(b.Bytes(), &raddr); err != nil {
  158.                 log.V(3).Infof("DHT: node write failed to %+v, error=%s", raddr, err)
  159.         } else {
  160.                 totalWrittenBytes.Add(int64(n))
  161.         }
  162.         return
  163. }

  164. // Read responses from bencode-speaking nodes. Return the appropriate data structure.
  165. func readResponse(p packetType) (response responseType, err error) {
  166.         // The calls to bencode.Unmarshal() can be fragile.
  167.         defer func() {
  168.                 if x := recover(); x != nil {
  169.                         log.V(3).Infof("DHT: !!! Recovering from panic() after bencode.Unmarshal %q, %v", string(p.b), x)
  170.                 }
  171.         }()
  172.         if e2 := bencode.Unmarshal(bytes.NewBuffer(p.b), &response); e2 == nil {
  173.                 err = nil
  174.                 return
  175.         } else {
  176.                 log.V(3).Infof("DHT: unmarshal error, odd or partial data during UDP read? %v, err=%s", string(p.b), e2)
  177.                 return response, e2
  178.         }
  179.         return
  180. }

  181. // Message to be sent out in the wire. Must not have any extra fields.
  182. type queryMessage struct {
  183.         T string                 "t"
  184.         Y string                 "y"
  185.         Q string                 "q"
  186.         A map[string]interface{} "a"
  187. }

  188. type replyMessage struct {
  189.         T string                 "t"
  190.         Y string                 "y"
  191.         R map[string]interface{} "r"
  192. }

  193. type packetType struct {
  194.         b     []byte
  195.         raddr net.UDPAddr
  196. }

  197. func listen(addr string, listenPort int, proto string) (socket *net.UDPConn, err error) {
  198.         log.V(3).Infof("DHT: Listening for peers on IP: %s port: %d Protocol=%s\n", addr, listenPort, proto)
  199.         listener, err := net.ListenPacket(proto, addr+":"+strconv.Itoa(listenPort))
  200.         if err != nil {
  201.                 log.V(3).Infof("DHT: Listen failed:", err)
  202.         }
  203.         if listener != nil {
  204.                 socket = listener.(*net.UDPConn)
  205.         }
  206.         return
  207. }

  208. // Read from UDP socket, writes slice of byte into channel.
  209. func readFromSocket(socket *net.UDPConn, conChan chan packetType, bytesArena arena, stop chan bool) {
  210.         for {
  211.                 b := bytesArena.Pop()
  212.                 n, addr, err := socket.ReadFromUDP(b)
  213.                 if err != nil {
  214.                         log.V(3).Infof("DHT: readResponse error:", err)
  215.                 }
  216.                 b = b[0:n]
  217.                 if n == maxUDPPacketSize {
  218.                         log.V(3).Infof("DHT: Warning. Received packet with len >= %d, some data may have been discarded.\n", maxUDPPacketSize)
  219.                 }
  220.                 totalReadBytes.Add(int64(n))
  221.                 if n > 0 && err == nil {
  222.                         p := packetType{b, *addr}
  223.                         select {
  224.                         case conChan <- p:
  225.                                 continue
  226.                         case <-stop:
  227.                                 return
  228.                         }
  229.                 }
  230.                 // Do a non-blocking read of the stop channel and stop this goroutine if the channel
  231.                 // has been closed.
  232.                 select {
  233.                 case <-stop:
  234.                         return
  235.                 default:
  236.                 }
  237.         }
  238. }

  239. func bogusId(id string) bool {
  240.         return len(id) != 20
  241. }

  242. func newTransactionId() int {
  243.         n, err := rand.Read(make([]byte, 1))
  244.         if err != nil {
  245.                 return time.Now().Second()
  246.         }
  247.         return n
  248. }

  249. type InfoHash string

  250. func (i InfoHash) String() string {
  251.         return fmt.Sprintf("%x", string(i))
  252. }

  253. // DecodeInfoHash transforms a hex-encoded 20-characters string to a binary
  254. // infohash.
  255. func DecodeInfoHash(x string) (b InfoHash, err error) {
  256.         var h []byte
  257.         h, err = hex.DecodeString(x)
  258.         if len(h) != 20 {
  259.                 return "", fmt.Errorf("DecodeInfoHash: expected InfoHash len=20, got %d", len(h))
  260.         }
  261.         return InfoHash(h), err
  262. }

  263. // DecodePeerAddress transforms the binary-encoded host:port address into a
  264. // human-readable format. So, "abcdef" becomes 97.98.99.100:25958.
  265. func DecodePeerAddress(x string) string {
  266.         return nettools.BinaryToDottedPort(x)
  267. }

 楼主| keer_zu 发表于 2016-12-6 15:01 | 显示全部楼层
上面代码完成dht网络中节点(node)和节点之间的沟通。
 楼主| keer_zu 发表于 2016-12-6 15:04 | 显示全部楼层
  1. // DHT node for Taipei Torrent, for tracker-less peer information exchange.
  2. // Status: Supports all DHT operations from the specification.

  3. package dht

  4. // Summary from the bittorrent DHT protocol specification:
  5. //
  6. // Message types:
  7. //  - query
  8. //  - response
  9. //  - error
  10. //
  11. // RPCs:
  12. //      ping:
  13. //         see if node is reachable and save it on routing table.
  14. //      find_node:
  15. //               run when DHT node count drops, or every X minutes. Just to ensure
  16. //               our DHT routing table is still useful.
  17. //      get_peers:
  18. //               the real deal. Iteratively queries DHT nodes and find new sources
  19. //               for a particular infohash.
  20. //        announce_peer:
  21. //         announce that the peer associated with this node is downloading a
  22. //         torrent.
  23. //
  24. // Reference:
  25. //     http://www.bittorrent.org/beps/bep_0005.html
  26. //

  27. import (
  28.         "crypto/rand"
  29.         "crypto/sha1"
  30.         "expvar"
  31.         "flag"
  32.         "fmt"
  33.         "io"
  34.         "net"
  35.         "strings"
  36.         "sync"
  37.         "time"

  38.         log "github.com/golang/glog"
  39.         "github.com/nictuku/nettools"
  40. )

  41. // ===== Logging behavior =====
  42. // By default nothing will be output on the screen, unless the program is
  43. // called with special flags like -logtostderr. To show really important
  44. // messages, use fmt.Print functions. The general rules for log verbosity are:
  45. // - Major errors are logged with V(0) (or just ommit the V).
  46. // - Rare events of major importance like the startup message should be logged
  47. // with V(1).
  48. // - Events related to user input (e.g: more peers requested, or peers found)
  49. // should be logged with V(2).
  50. // - Protocol errors should be logged with V(3). They are often noisy.
  51. // - New incoming and outgoing KRPCs should also be logged with V(3).
  52. // - Debugging and tracing should be logged with V(4) or higher.

  53. // Config for the DHT Node. Use NewConfig to create a configuration with default values.
  54. type Config struct {
  55.         // IP Address to listen on.  If left blank, one is chosen automatically.
  56.         Address string
  57.         // UDP port the DHT node should listen on. If zero, it picks a random port.
  58.         Port int
  59.         // Number of peers that DHT will try to find for each infohash being searched. This might
  60.         // later be moved to a per-infohash option. Default value: 5.
  61.         NumTargetPeers int
  62.         // Comma separated list of DHT routers used for bootstrapping the network.
  63.         DHTRouters string
  64.         // Maximum number of nodes to store in the routing table. Default value: 100.
  65.         MaxNodes int
  66.         // How often to ping nodes in the network to see if they are reachable. Default value: 15 min.
  67.         CleanupPeriod time.Duration
  68.         //  If true, the node will read the routing table from disk on startup and save routing
  69.         //  table snapshots on disk every few minutes. Default value: true.
  70.         SaveRoutingTable bool
  71.         // How often to save the routing table to disk. Default value: 5 minutes.
  72.         SavePeriod time.Duration
  73.         // Maximum packets per second to be processed. Disabled if negative. Default value: 100.
  74.         RateLimit int64
  75.         // MaxInfoHashes is the limit of number of infohashes for which we should keep a peer list.
  76.         // If this and MaxInfoHashPeers are unchanged, it should consume around 25 MB of RAM. Larger
  77.         // values help keeping the DHT network healthy. Default value: 2048.
  78.         MaxInfoHashes int
  79.         // MaxInfoHashPeers is the limit of number of peers to be tracked for each infohash. A
  80.         // single peer contact typically consumes 6 bytes. Default value: 256.
  81.         MaxInfoHashPeers int
  82.         // ClientPerMinuteLimit protects against spammy clients. Ignore their requests if exceeded
  83.         // this number of packets per minute. Default value: 50.
  84.         ClientPerMinuteLimit int
  85.         // ThrottlerTrackedClients is the number of hosts the client throttler remembers. An LRU is used to
  86.         // track the most interesting ones. Default value: 1000.
  87.         ThrottlerTrackedClients int64
  88.         //Protocol for UDP connections, udp4= IPv4, udp6 = IPv6
  89.         UDPProto string
  90. }

  91. // Creates a *Config populated with default values.
  92. func NewConfig() *Config {
  93.         return &Config{
  94.                 Address:                 "",
  95.                 Port:                    0, // Picks a random port.
  96.                 NumTargetPeers:          5,
  97.                 DHTRouters:              "router.magnets.im:6881,router.bittorrent.com:6881,dht.transmissionbt.com:6881",
  98.                 MaxNodes:                500,
  99.                 CleanupPeriod:           15 * time.Minute,
  100.                 SaveRoutingTable:        true,
  101.                 SavePeriod:              5 * time.Minute,
  102.                 RateLimit:               100,
  103.                 MaxInfoHashes:           2048,
  104.                 MaxInfoHashPeers:        256,
  105.                 ClientPerMinuteLimit:    50,
  106.                 ThrottlerTrackedClients: 1000,
  107.                 UDPProto:                "udp4",
  108.         }
  109. }

  110. var DefaultConfig = NewConfig()

  111. // Registers Config fields as command line flags.  If c is nil, DefaultConfig
  112. // is used.
  113. func RegisterFlags(c *Config) {
  114.         if c == nil {
  115.                 c = DefaultConfig
  116.         }
  117.         flag.StringVar(&c.DHTRouters, "routers", c.DHTRouters,
  118.                 "Comma separated addresses of DHT routers used to bootstrap the DHT network.")
  119.         flag.IntVar(&c.MaxNodes, "maxNodes", c.MaxNodes,
  120.                 "Maximum number of nodes to store in the routing table, in memory. This is the primary configuration for how noisy or aggressive this node should be. When the node starts, it will try to reach d.config.MaxNodes/2 as quick as possible, to form a healthy routing table.")
  121.         flag.DurationVar(&c.CleanupPeriod, "cleanupPeriod", c.CleanupPeriod,
  122.                 "How often to ping nodes in the network to see if they are reachable.")
  123.         flag.DurationVar(&c.SavePeriod, "savePeriod", c.SavePeriod,
  124.                 "How often to save the routing table to disk.")
  125.         flag.Int64Var(&c.RateLimit, "rateLimit", c.RateLimit,
  126.                 "Maximum packets per second to be processed. Beyond this limit they are silently dropped. Set to -1 to disable rate limiting.")
  127. }

  128. const (
  129.         // Try to ensure that at least these many nodes are in the routing table.
  130.         minNodes           = 16
  131.         secretRotatePeriod = 5 * time.Minute
  132. )

  133. // DHT should be created by New(). It provides DHT features to a torrent
  134. // client, such as finding new peers for torrent downloads without requiring a
  135. // tracker.
  136. type DHT struct {
  137.         nodeId                 string
  138.         config                 Config
  139.         routingTable           *routingTable
  140.         peerStore              *peerStore
  141.         conn                   *net.UDPConn
  142.         Logger                 Logger
  143.         exploredNeighborhood   bool
  144.         remoteNodeAcquaintance chan string
  145.         peersRequest           chan ihReq
  146.         nodesRequest           chan ihReq
  147.         pingRequest            chan *remoteNode
  148.         portRequest            chan int
  149.         stop                   chan bool
  150.         wg                     sync.WaitGroup
  151.         clientThrottle         *nettools.ClientThrottle
  152.         store                  *dhtStore
  153.         tokenSecrets           []string

  154.         // Public channels:
  155.         PeersRequestResults chan map[InfoHash][]string // key = infohash, v = slice of peers.
  156. }

  157. // New creates a DHT node. If config is nil, DefaultConfig will be used.
  158. // Changing the config after calling this function has no effect.
  159. //
  160. // This method replaces NewDHTNode.
  161. func New(config *Config) (node *DHT, err error) {
  162.         if config == nil {
  163.                 config = DefaultConfig
  164.         }
  165.         // Copy to avoid changes.
  166.         cfg := *config
  167.         node = &DHT{
  168.                 config:               cfg,
  169.                 routingTable:         newRoutingTable(),
  170.                 peerStore:            newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers),
  171.                 PeersRequestResults:  make(chan map[InfoHash][]string, 1),
  172.                 stop:                 make(chan bool),
  173.                 exploredNeighborhood: false,
  174.                 // Buffer to avoid blocking on sends.
  175.                 remoteNodeAcquaintance: make(chan string, 100),
  176.                 // Buffer to avoid deadlocks and blocking on sends.
  177.                 peersRequest:   make(chan ihReq, 100),
  178.                 nodesRequest:   make(chan ihReq, 100),
  179.                 pingRequest:    make(chan *remoteNode),
  180.                 portRequest:    make(chan int),
  181.                 clientThrottle: nettools.NewThrottler(cfg.ClientPerMinuteLimit, cfg.ThrottlerTrackedClients),
  182.                 tokenSecrets:   []string{newTokenSecret(), newTokenSecret()},
  183.         }
  184.         c := openStore(cfg.Port, cfg.SaveRoutingTable)
  185.         node.store = c
  186.         if len(c.Id) != 20 {
  187.                 c.Id = randNodeId()
  188.                 log.V(4).Infof("Using a new random node ID: %x %d", c.Id, len(c.Id))
  189.                 saveStore(*c)
  190.         }
  191.         // The types don't match because JSON marshalling needs []byte.
  192.         node.nodeId = string(c.Id)

  193.         // XXX refactor.
  194.         node.routingTable.nodeId = node.nodeId

  195.         // This is called before the engine is up and ready to read from the
  196.         // underlying channel.
  197.         node.wg.Add(1)
  198.         go func() {
  199.                 defer node.wg.Done()
  200.                 for addr, _ := range c.Remotes {
  201.                         node.AddNode(addr)
  202.                 }
  203.         }()
  204.         return
  205. }

  206. func newTokenSecret() string {
  207.         b := make([]byte, 5)
  208.         if _, err := rand.Read(b); err != nil {
  209.                 // This would return a string with up to 5 null chars.
  210.                 log.Warningf("DHT: failed to generate random newTokenSecret: %v", err)
  211.         }
  212.         return string(b)
  213. }

  214. // Logger allows the DHT client to attach hooks for certain RPCs so it can log
  215. // interesting events any way it wants.
  216. type Logger interface {
  217.         GetPeers(addr net.UDPAddr, queryID string, infoHash InfoHash)
  218. }

  219. type ihReq struct {
  220.         ih       InfoHash
  221.         announce bool
  222. }

  223. // PeersRequest asks the DHT to search for more peers for the infoHash
  224. // provided. announce should be true if the connected peer is actively
  225. // downloading this infohash, which is normally the case - unless this DHT node
  226. // is just a router that doesn't downloads torrents.
  227. func (d *DHT) PeersRequest(ih string, announce bool) {
  228.         d.peersRequest <- ihReq{InfoHash(ih), announce}
  229.         log.V(2).Infof("DHT: torrent client asking more peers for %x.", ih)
  230. }

  231. // Stop the DHT node.
  232. func (d *DHT) Stop() {
  233.         close(d.stop)
  234.         d.wg.Wait()
  235. }

  236. // Port returns the port number assigned to the DHT. This is useful when
  237. // when initialising the DHT with port 0, i.e. automatic port assignment,
  238. // in order to retrieve the actual port number used.
  239. func (d *DHT) Port() int {
  240.         return <-d.portRequest
  241. }

  242. // AddNode informs the DHT of a new node it should add to its routing table.
  243. // addr is a string containing the target node's "host:port" UDP address.
  244. func (d *DHT) AddNode(addr string) {
  245.         d.remoteNodeAcquaintance <- addr
  246. }

  247. // Asks for more peers for a torrent.
  248. func (d *DHT) getPeers(infoHash InfoHash) {
  249.         closest := d.routingTable.lookupFiltered(infoHash)
  250.         if len(closest) == 0 {
  251.                 for _, s := range strings.Split(d.config.DHTRouters, ",") {
  252.                         if s != "" {
  253.                                 r, e := d.routingTable.getOrCreateNode("", s, d.config.UDPProto)
  254.                                 if e == nil {
  255.                                         d.getPeersFrom(r, infoHash)
  256.                                 }
  257.                         }
  258.                 }
  259.         }
  260.         for _, r := range closest {
  261.                 d.getPeersFrom(r, infoHash)
  262.         }
  263. }

  264. // Find a DHT node.
  265. func (d *DHT) findNode(id string) {
  266.         ih := InfoHash(id)
  267.         closest := d.routingTable.lookupFiltered(ih)
  268.         if len(closest) == 0 {
  269.                 for _, s := range strings.Split(d.config.DHTRouters, ",") {
  270.                         if s != "" {
  271.                                 r, e := d.routingTable.getOrCreateNode("", s, d.config.UDPProto)
  272.                                 if e == nil {
  273.                                         d.findNodeFrom(r, id)
  274.                                 }
  275.                         }
  276.                 }
  277.         }
  278.         for _, r := range closest {
  279.                 d.findNodeFrom(r, id)
  280.         }
  281. }

  282. // Start launches the dht node. It starts a listener
  283. // on the desired address, then runs the main loop in a
  284. // separate go routine - Start replaces Run and will
  285. // always return, with nil if the dht successfully
  286. // started or with an error either. d.Stop() is expected
  287. // by the caller to stop the dht
  288. func (d *DHT) Start() (err error) {
  289.         if err = d.initSocket(); err == nil {
  290.                 d.wg.Add(1)
  291.                 go func() {
  292.                         defer d.wg.Done()
  293.                         d.loop()
  294.                 }()
  295.         }
  296.         return err
  297. }

  298. // Run launches the dht node. It starts a listener
  299. // on the desired address, then runs the main loop in the
  300. // same go routine.
  301. // If initSocket fails, Run returns with the error.
  302. // If initSocket succeeds, Run blocks until d.Stop() is called.
  303. // DEPRECATED - Start should be used instead of Run
  304. func (d *DHT) Run() error {
  305.         log.Warning("dht.Run() is deprecated, use dht.Start() instead")
  306.         if err := d.initSocket(); err != nil {
  307.                 return err
  308.         }
  309.         d.loop()
  310.         return nil
  311. }

  312. // initSocket initializes the udp socket
  313. // listening to incoming dht requests
  314. func (d *DHT) initSocket() (err error) {
  315.         d.conn, err = listen(d.config.Address, d.config.Port, d.config.UDPProto)
  316.         if err != nil {
  317.                 return err
  318.         }

  319.         // Update the stored port number in case it was set 0, meaning it was
  320.         // set automatically by the system
  321.         d.config.Port = d.conn.LocalAddr().(*net.UDPAddr).Port
  322.         return nil
  323. }

  324. func (d *DHT) bootstrap() {
  325.         // Bootstrap the network (only if there are configured dht routers).
  326.         for _, s := range strings.Split(d.config.DHTRouters, ",") {
  327.                 if s != "" {
  328.                         d.ping(s)
  329.                         r, e := d.routingTable.getOrCreateNode("", s, d.config.UDPProto)
  330.                         if e == nil {
  331.                                 d.findNodeFrom(r, d.nodeId)
  332.                         }
  333.                 }
  334.         }
  335.         d.findNode(d.nodeId)
  336.         d.getMorePeers(nil)
  337. }

  338. // loop is the main working section of dht.
  339. // It bootstraps a routing table, if necessary,
  340. // and listens for incoming DHT requests until d.Stop()
  341. // is called from another go routine.
  342. func (d *DHT) loop() {
  343.         // Close socket
  344.         defer d.conn.Close()

  345.         // There is goroutine pushing and one popping items out of the arena.
  346.         // One passes work to the other. So there is little contention in the
  347.         // arena, so it doesn't need many items (it used to have 500!). If
  348.         // readFromSocket or the packet processing ever need to be
  349.         // parallelized, this would have to be bumped.
  350.         bytesArena := newArena(maxUDPPacketSize, 3)
  351.         socketChan := make(chan packetType)
  352.         d.wg.Add(1)
  353.         go func() {
  354.                 defer d.wg.Done()
  355.                 readFromSocket(d.conn, socketChan, bytesArena, d.stop)
  356.         }()

  357.         d.bootstrap()

  358.         cleanupTicker := time.Tick(d.config.CleanupPeriod)
  359.         secretRotateTicker := time.Tick(secretRotatePeriod)

  360.         saveTicker := make(<-chan time.Time)
  361.         if d.store != nil {
  362.                 saveTicker = time.Tick(d.config.SavePeriod)
  363.         }

  364.         var fillTokenBucket <-chan time.Time
  365.         tokenBucket := d.config.RateLimit

  366.         if d.config.RateLimit < 0 {
  367.                 log.Warning("rate limiting disabled")
  368.         } else {
  369.                 // Token bucket for limiting the number of packets per second.
  370.                 fillTokenBucket = time.Tick(time.Second / 10)
  371.                 if d.config.RateLimit > 0 && d.config.RateLimit < 10 {
  372.                         // Less than 10 leads to rounding problems.
  373.                         d.config.RateLimit = 10
  374.                 }
  375.         }
  376.         log.V(1).Infof("DHT: Starting DHT node %x on port %d.", d.nodeId, d.config.Port)

  377.         for {
  378.                 select {
  379.                 case <-d.stop:
  380.                         log.V(1).Infof("DHT exiting.")
  381.                         d.clientThrottle.Stop()
  382.                         log.Flush()
  383.                         return
  384.                 case addr := <-d.remoteNodeAcquaintance:
  385.                         d.helloFromPeer(addr)
  386.                 case req := <-d.peersRequest:
  387.                         // torrent server is asking for more peers for infoHash.  Ask the closest
  388.                         // nodes for directions. The goroutine will write into the
  389.                         // PeersNeededResults channel.

  390.                         // Drain all requests sitting in the channel and de-dupe them.
  391.                         m := map[InfoHash]bool{req.ih: req.announce}
  392.                 P:
  393.                         for {
  394.                                 select {
  395.                                 case req = <-d.peersRequest:
  396.                                         m[req.ih] = req.announce
  397.                                 default:
  398.                                         // Channel drained.
  399.                                         break P
  400.                                 }
  401.                         }
  402.                         // Process each unique infohash for which there were requests.
  403.                         for ih, announce := range m {
  404.                                 if announce {
  405.                                         d.peerStore.addLocalDownload(ih)
  406.                                 }

  407.                                 d.getPeers(ih) // I might have enough peers in the peerstore, but no seeds
  408.                         }

  409.                 case req := <-d.nodesRequest:
  410.                         m := map[InfoHash]bool{req.ih: true}
  411.                 L:
  412.                         for {
  413.                                 select {
  414.                                 case req = <-d.nodesRequest:
  415.                                         m[req.ih] = true
  416.                                 default:
  417.                                         // Channel drained.
  418.                                         break L
  419.                                 }
  420.                         }
  421.                         for ih, _ := range m {
  422.                                 d.findNode(string(ih))
  423.                         }

  424.                 case p := <-socketChan:
  425.                         totalRecv.Add(1)
  426.                         if d.config.RateLimit > 0 {
  427.                                 if tokenBucket > 0 {
  428.                                         d.processPacket(p)
  429.                                         tokenBucket -= 1
  430.                                 } else {
  431.                                         // TODO In the future it might be better to avoid dropping things like ping replies.
  432.                                         totalDroppedPackets.Add(1)
  433.                                 }
  434.                         } else {
  435.                                 d.processPacket(p)
  436.                         }
  437.                         bytesArena.Push(p.b)

  438.                 case <-fillTokenBucket:
  439.                         if tokenBucket < d.config.RateLimit {
  440.                                 tokenBucket += d.config.RateLimit / 10
  441.                         }
  442.                 case <-cleanupTicker:
  443.                         needPing := d.routingTable.cleanup(d.config.CleanupPeriod, d.peerStore)
  444.                         d.wg.Add(1)
  445.                         go func() {
  446.                                 defer d.wg.Done()
  447.                                 pingSlowly(d.pingRequest, needPing, d.config.CleanupPeriod, d.stop)
  448.                         }()
  449.                         if d.needMoreNodes() {
  450.                                 d.bootstrap()
  451.                         }
  452.                 case node := <-d.pingRequest:
  453.                         d.pingNode(node)
  454.                 case <-secretRotateTicker:
  455.                         d.tokenSecrets = []string{newTokenSecret(), d.tokenSecrets[0]}
  456.                 case d.portRequest <- d.config.Port:
  457.                         continue
  458.                 case <-saveTicker:
  459.                         tbl := d.routingTable.reachableNodes()
  460.                         if len(tbl) > 5 {
  461.                                 d.store.Remotes = tbl
  462.                                 saveStore(*d.store)
  463.                         }
  464.                 }
  465.         }
  466. }


 楼主| keer_zu 发表于 2016-12-6 15:05 | 显示全部楼层
  1. func (d *DHT) needMoreNodes() bool {
  2.         n := d.routingTable.numNodes()
  3.         return n < minNodes || n*2 < d.config.MaxNodes
  4. }

  5. func (d *DHT) needMorePeers(ih InfoHash) bool {
  6.         return d.peerStore.alive(ih) < d.config.NumTargetPeers
  7. }

  8. func (d *DHT) getMorePeers(r *remoteNode) {
  9.         for ih := range d.peerStore.localActiveDownloads {
  10.                 if d.needMorePeers(ih) {
  11.                         if r == nil {
  12.                                 d.getPeers(ih)
  13.                         } else {
  14.                                 d.getPeersFrom(r, ih)
  15.                         }
  16.                 }
  17.         }
  18. }

  19. func (d *DHT) helloFromPeer(addr string) {
  20.         // We've got a new node id. We need to:
  21.         // - see if we know it already, skip accordingly.
  22.         // - ping it and see if it's reachable.
  23.         // - if it responds, save it in the routing table.
  24.         _, addrResolved, existed, err := d.routingTable.hostPortToNode(addr, d.config.UDPProto)
  25.         if err != nil {
  26.                 log.Warningf("helloFromPeer error: %v", err)
  27.                 return
  28.         }
  29.         if existed {
  30.                 // Node host+port already known.
  31.                 return
  32.         }
  33.         if d.routingTable.length() < d.config.MaxNodes {
  34.                 d.ping(addrResolved)
  35.                 return
  36.         }
  37. }

  38. func (d *DHT) processPacket(p packetType) {
  39.         log.V(5).Infof("DHT processing packet from %v", p.raddr.String())
  40.         if !d.clientThrottle.CheckBlock(p.raddr.IP.String()) {
  41.                 totalPacketsFromBlockedHosts.Add(1)
  42.                 log.V(5).Infof("Node exceeded rate limiter. Dropping packet.")
  43.                 return
  44.         }
  45.         if p.b[0] != 'd' {
  46.                 // Malformed DHT packet. There are protocol extensions out
  47.                 // there that we don't support or understand.
  48.                 log.V(5).Infof("Malformed DHT packet.")
  49.                 return
  50.         }
  51.         r, err := readResponse(p)
  52.         if err != nil {
  53.                 log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b))
  54.                 return
  55.         }
  56.         switch {
  57.         // Response.
  58.         case r.Y == "r":
  59.                 log.V(5).Infof("DHT processing response from %x", r.R.Id)
  60.                 if bogusId(r.R.Id) {
  61.                         log.V(3).Infof("DHT received packet with bogus node id %x", r.R.Id)
  62.                         return
  63.                 }
  64.                 if r.R.Id == d.nodeId {
  65.                         log.V(3).Infof("DHT received reply from self, id %x", r.A.Id)
  66.                         return
  67.                 }
  68.                 node, addr, existed, err := d.routingTable.hostPortToNode(p.raddr.String(), d.config.UDPProto)
  69.                 if err != nil {
  70.                         log.V(3).Infof("DHT readResponse error processing response: %v", err)
  71.                         return
  72.                 }
  73.                 if !existed {
  74.                         log.V(3).Infof("DHT: Received reply from a host we don't know: %v", p.raddr)
  75.                         if d.routingTable.length() < d.config.MaxNodes {
  76.                                 d.ping(addr)
  77.                         }
  78.                         return
  79.                 }
  80.                 // Fix the node ID.
  81.                 if node.id == "" {
  82.                         node.id = r.R.Id
  83.                         d.routingTable.update(node, d.config.UDPProto)
  84.                 }
  85.                 if node.id != r.R.Id {
  86.                         log.V(3).Infof("DHT: Node changed IDs %x => %x", node.id, r.R.Id)
  87.                 }
  88.                 if query, ok := node.pendingQueries[r.T]; ok {
  89.                         log.V(4).Infof("DHT: Received reply to %v", query.Type)
  90.                         if !node.reachable {
  91.                                 node.reachable = true
  92.                                 totalNodesReached.Add(1)
  93.                         }
  94.                         node.lastResponseTime = time.Now()
  95.                         node.pastQueries[r.T] = query
  96.                         d.routingTable.neighborhoodUpkeep(node, d.config.UDPProto, d.peerStore)

  97.                         // If this is the first host added to the routing table, attempt a
  98.                         // recursive lookup of our own address, to build our neighborhood ASAP.
  99.                         if d.needMoreNodes() {
  100.                                 log.V(5).Infof("DHT: need more nodes")
  101.                                 d.findNode(d.nodeId)
  102.                         }
  103.                         d.exploredNeighborhood = true

  104.                         switch query.Type {
  105.                         case "ping":
  106.                                 // Served its purpose, nothing else to be done.
  107.                                 totalRecvPingReply.Add(1)
  108.                         case "get_peers":
  109.                                 log.V(5).Infof("DHT: got get_peers response")
  110.                                 d.processGetPeerResults(node, r)
  111.                         case "find_node":
  112.                                 log.V(5).Infof("DHT: got find_node response")
  113.                                 d.processFindNodeResults(node, r)
  114.                         case "announce_peer":
  115.                                 // Nothing to do. In the future, update counters.
  116.                         default:
  117.                                 log.V(3).Infof("DHT: Unknown query type: %v from %v", query.Type, addr)
  118.                         }
  119.                         delete(node.pendingQueries, r.T)
  120.                 } else {
  121.                         log.V(3).Infof("DHT: Unknown query id: %v", r.T)
  122.                 }
  123.         case r.Y == "q":
  124.                 if r.A.Id == d.nodeId {
  125.                         log.V(3).Infof("DHT received packet from self, id %x", r.A.Id)
  126.                         return
  127.                 }
  128.                 node, addr, existed, err := d.routingTable.hostPortToNode(p.raddr.String(), d.config.UDPProto)
  129.                 if err != nil {
  130.                         log.Warningf("Error readResponse error processing query: %v", err)
  131.                         return
  132.                 }
  133.                 if !existed {
  134.                         // Another candidate for the routing table. See if it's reachable.
  135.                         if d.routingTable.length() < d.config.MaxNodes {
  136.                                 d.ping(addr)
  137.                         }
  138.                 }
  139.                 log.V(5).Infof("DHT processing %v request", r.Q)
  140.                 switch r.Q {
  141.                 case "ping":
  142.                         d.replyPing(p.raddr, r)
  143.                 case "get_peers":
  144.                         d.replyGetPeers(p.raddr, r)
  145.                 case "find_node":
  146.                         d.replyFindNode(p.raddr, r)
  147.                 case "announce_peer":
  148.                         d.replyAnnouncePeer(p.raddr, node, r)
  149.                 default:
  150.                         log.V(3).Infof("DHT: non-implemented handler for type %v", r.Q)
  151.                 }
  152.         default:
  153.                 log.V(3).Infof("DHT: Bogus DHT query from %v.", p.raddr)
  154.         }
  155. }

  156. func (d *DHT) ping(address string) {
  157.         r, err := d.routingTable.getOrCreateNode("", address, d.config.UDPProto)
  158.         if err != nil {
  159.                 log.V(3).Infof("ping error for address %v: %v", address, err)
  160.                 return
  161.         }
  162.         d.pingNode(r)
  163. }

  164. func (d *DHT) pingNode(r *remoteNode) {
  165.         log.V(3).Infof("DHT: ping => %+v", r.address)
  166.         t := r.newQuery("ping")

  167.         queryArguments := map[string]interface{}{"id": d.nodeId}
  168.         query := queryMessage{t, "q", "ping", queryArguments}
  169.         sendMsg(d.conn, r.address, query)
  170.         totalSentPing.Add(1)
  171. }

  172. func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) {
  173.         if r == nil {
  174.                 return
  175.         }
  176.         totalSentGetPeers.Add(1)
  177.         ty := "get_peers"
  178.         transId := r.newQuery(ty)
  179.         if _, ok := r.pendingQueries[transId]; ok {
  180.                 r.pendingQueries[transId].ih = ih
  181.         } else {
  182.                 r.pendingQueries[transId] = &queryType{ih: ih}
  183.         }
  184.         queryArguments := map[string]interface{}{
  185.                 "id":        d.nodeId,
  186.                 "info_hash": ih,
  187.         }
  188.         query := queryMessage{transId, "q", ty, queryArguments}
  189.         if log.V(3) {
  190.                 x := hashDistance(InfoHash(r.id), ih)
  191.                 log.V(3).Infof("DHT sending get_peers. nodeID: %x@%v, InfoHash: %x , distance: %x", r.id, r.address, ih, x)
  192.         }
  193.         r.lastSearchTime = time.Now()
  194.         sendMsg(d.conn, r.address, query)
  195. }

  196. func (d *DHT) findNodeFrom(r *remoteNode, id string) {
  197.         if r == nil {
  198.                 return
  199.         }
  200.         totalSentFindNode.Add(1)
  201.         ty := "find_node"
  202.         transId := r.newQuery(ty)
  203.         ih := InfoHash(id)
  204.         log.V(3).Infof("findNodeFrom adding pendingQueries transId=%v ih=%x", transId, ih)
  205.         if _, ok := r.pendingQueries[transId]; ok {
  206.                 r.pendingQueries[transId].ih = ih
  207.         } else {
  208.                 r.pendingQueries[transId] = &queryType{ih: ih}
  209.         }
  210.         queryArguments := map[string]interface{}{
  211.                 "id":     d.nodeId,
  212.                 "target": id,
  213.         }
  214.         query := queryMessage{transId, "q", ty, queryArguments}
  215.         if log.V(3) {
  216.                 x := hashDistance(InfoHash(r.id), ih)
  217.                 log.V(3).Infof("DHT sending find_node. nodeID: %x@%v, target ID: %x , distance: %x", r.id, r.address, id, x)
  218.         }
  219.         r.lastSearchTime = time.Now()
  220.         sendMsg(d.conn, r.address, query)
  221. }

  222. // announcePeer sends a message to the destination address to advertise that
  223. // our node is a peer for this infohash, using the provided token to
  224. // 'authenticate'.
  225. func (d *DHT) announcePeer(address net.UDPAddr, ih InfoHash, token string) {
  226.         r, err := d.routingTable.getOrCreateNode("", address.String(), d.config.UDPProto)
  227.         if err != nil {
  228.                 log.V(3).Infof("announcePeer error: %v", err)
  229.                 return
  230.         }
  231.         ty := "announce_peer"
  232.         log.V(3).Infof("DHT: announce_peer => address: %v, ih: %x, token: %x", address, ih, token)
  233.         transId := r.newQuery(ty)
  234.         queryArguments := map[string]interface{}{
  235.                 "id":        d.nodeId,
  236.                 "info_hash": ih,
  237.                 "port":      d.config.Port,
  238.                 "token":     token,
  239.         }
  240.         query := queryMessage{transId, "q", ty, queryArguments}
  241.         sendMsg(d.conn, address, query)
  242. }

  243. func (d *DHT) hostToken(addr net.UDPAddr, secret string) string {
  244.         h := sha1.New()
  245.         io.WriteString(h, addr.String())
  246.         io.WriteString(h, secret)
  247.         return fmt.Sprintf("%x", h.Sum(nil))
  248. }

  249. func (d *DHT) checkToken(addr net.UDPAddr, token string) bool {
  250.         match := false
  251.         for _, secret := range d.tokenSecrets {
  252.                 if d.hostToken(addr, secret) == token {
  253.                         match = true
  254.                         break
  255.                 }
  256.         }
  257.         log.V(4).Infof("checkToken for %v, %q matches? %v", addr, token, match)
  258.         return match
  259. }

  260. func (d *DHT) replyAnnouncePeer(addr net.UDPAddr, node *remoteNode, r responseType) {
  261.         ih := InfoHash(r.A.InfoHash)
  262.         if log.V(3) {
  263.                 log.Infof("DHT: announce_peer. Host %v, nodeID: %x, infoHash: %x, peerPort %d, distance to me %x",
  264.                         addr, r.A.Id, ih, r.A.Port, hashDistance(ih, InfoHash(d.nodeId)),
  265.                 )
  266.         }
  267.         // node can be nil if, for example, the server just restarted and received an announce_peer
  268.         // from a node it doesn't yet know about.
  269.         if node != nil && d.checkToken(addr, r.A.Token) {
  270.                 peerAddr := net.TCPAddr{IP: addr.IP, Port: r.A.Port}
  271.                 d.peerStore.addContact(ih, nettools.DottedPortToBinary(peerAddr.String()))
  272.                 // Allow searching this node immediately, since it's telling us
  273.                 // it has an infohash. Enables faster upgrade of other nodes to
  274.                 // "peer" of an infohash, if the announcement is valid.
  275.                 node.lastResponseTime = time.Now().Add(-searchRetryPeriod)
  276.                 if d.peerStore.hasLocalDownload(ih) {
  277.                         d.PeersRequestResults <- map[InfoHash][]string{ih: []string{nettools.DottedPortToBinary(peerAddr.String())}}
  278.                 }
  279.         }
  280.         // Always reply positively. jech says this is to avoid "back-tracking", not sure what that means.
  281.         reply := replyMessage{
  282.                 T: r.T,
  283.                 Y: "r",
  284.                 R: map[string]interface{}{"id": d.nodeId},
  285.         }
  286.         sendMsg(d.conn, addr, reply)
  287. }

  288. func (d *DHT) replyGetPeers(addr net.UDPAddr, r responseType) {
  289.         totalRecvGetPeers.Add(1)
  290.         if log.V(3) {
  291.                 log.Infof("DHT get_peers. Host: %v , nodeID: %x , InfoHash: %x , distance to me: %x",
  292.                         addr, r.A.Id, InfoHash(r.A.InfoHash), hashDistance(r.A.InfoHash, InfoHash(d.nodeId)))
  293.         }

  294.         if d.Logger != nil {
  295.                 d.Logger.GetPeers(addr, r.A.Id, r.A.InfoHash)
  296.         }

  297.         ih := r.A.InfoHash
  298.         r0 := map[string]interface{}{"id": d.nodeId, "token": d.hostToken(addr, d.tokenSecrets[0])}
  299.         reply := replyMessage{
  300.                 T: r.T,
  301.                 Y: "r",
  302.                 R: r0,
  303.         }

  304.         if peerContacts := d.peersForInfoHash(ih); len(peerContacts) > 0 {
  305.                 reply.R["values"] = peerContacts
  306.         } else {
  307.                 reply.R["nodes"] = d.nodesForInfoHash(ih)
  308.         }
  309.         sendMsg(d.conn, addr, reply)
  310. }

  311. func (d *DHT) nodesForInfoHash(ih InfoHash) string {
  312.         n := make([]string, 0, kNodes)
  313.         for _, r := range d.routingTable.lookup(ih) {
  314.                 // r is nil when the node was filtered.
  315.                 if r != nil {
  316.                         binaryHost := r.id + nettools.DottedPortToBinary(r.address.String())
  317.                         if binaryHost == "" {
  318.                                 log.V(3).Infof("killing node with bogus address %v", r.address.String())
  319.                                 d.routingTable.kill(r, d.peerStore)
  320.                         } else {
  321.                                 n = append(n, binaryHost)
  322.                         }
  323.                 }
  324.         }
  325.         log.V(3).Infof("replyGetPeers: Nodes only. Giving %d", len(n))
  326.         return strings.Join(n, "")
  327. }

  328. func (d *DHT) peersForInfoHash(ih InfoHash) []string {
  329.         peerContacts := d.peerStore.peerContacts(ih)
  330.         if len(peerContacts) > 0 {
  331.                 log.V(3).Infof("replyGetPeers: Giving peers! %x was requested, and we knew %d peers!", ih, len(peerContacts))
  332.         }
  333.         return peerContacts
  334. }

  335. func (d *DHT) replyFindNode(addr net.UDPAddr, r responseType) {
  336.         totalRecvFindNode.Add(1)
  337.         if log.V(3) {
  338.                 x := hashDistance(InfoHash(r.A.Target), InfoHash(d.nodeId))
  339.                 log.Infof("DHT find_node. Host: %v , nodeId: %x , target ID: %x , distance to me: %x",
  340.                         addr, r.A.Id, r.A.Target, x)
  341.         }

  342.         node := InfoHash(r.A.Target)
  343.         r0 := map[string]interface{}{"id": d.nodeId}
  344.         reply := replyMessage{
  345.                 T: r.T,
  346.                 Y: "r",
  347.                 R: r0,
  348.         }

  349.         neighbors := d.routingTable.lookupFiltered(node)
  350.         if len(neighbors) < kNodes {
  351.                 neighbors = append(neighbors, d.routingTable.lookup(node)...)
  352.         }
  353.         n := make([]string, 0, kNodes)
  354.         for _, r := range neighbors {
  355.                 n = append(n, r.id+r.addressBinaryFormat)
  356.                 if len(n) == kNodes {
  357.                         break
  358.                 }
  359.         }
  360.         log.V(3).Infof("replyFindNode: Nodes only. Giving %d", len(n))
  361.         reply.R["nodes"] = strings.Join(n, "")
  362.         sendMsg(d.conn, addr, reply)
  363. }

  364. func (d *DHT) replyPing(addr net.UDPAddr, response responseType) {
  365.         log.V(3).Infof("DHT: reply ping => %v", addr)
  366.         reply := replyMessage{
  367.                 T: response.T,
  368.                 Y: "r",
  369.                 R: map[string]interface{}{"id": d.nodeId},
  370.         }
  371.         sendMsg(d.conn, addr, reply)
  372. }

  373. // Process another node's response to a get_peers query. If the response
  374. // contains peers, send them to the Torrent engine, our client, using the
  375. // DHT.PeersRequestResults channel. If it contains closest nodes, query
  376. // them if we still need it. Also announce ourselves as a peer for that node,
  377. // unless we are in supernode mode.
  378. func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType) {
  379.         totalRecvGetPeersReply.Add(1)

  380.         query, _ := node.pendingQueries[resp.T]
  381.         if d.peerStore.hasLocalDownload(query.ih) {
  382.                 d.announcePeer(node.address, query.ih, resp.R.Token)
  383.         }
  384.         if resp.R.Values != nil {
  385.                 peers := make([]string, 0)
  386.                 for _, peerContact := range resp.R.Values {
  387.                         // send peer even if we already have it in store
  388.                         // the underlying client does/should handle dupes
  389.                         d.peerStore.addContact(query.ih, peerContact)
  390.                         peers = append(peers, peerContact)
  391.                 }
  392.                 if len(peers) > 0 {
  393.                         // Finally, new peers.
  394.                         result := map[InfoHash][]string{query.ih: peers}
  395.                         totalPeers.Add(int64(len(peers)))
  396.                         log.V(2).Infof("DHT: processGetPeerResults, totalPeers: %v", totalPeers.String())
  397.                         select {
  398.                         case d.PeersRequestResults <- result:
  399.                         case <-d.stop:
  400.                                 // if we're closing down and the caller has stopped reading
  401.                                 // from PeersRequestResults, drop the result.
  402.                         }
  403.                 }
  404.         }
  405.         var nodelist string

  406.         if d.config.UDPProto == "udp4" {
  407.                 nodelist = resp.R.Nodes
  408.         } else if d.config.UDPProto == "udp6" {
  409.                 nodelist = resp.R.Nodes6
  410.         }
  411.         log.V(5).Infof("DHT: handling get_peers results len(nodelist)=%d", len(nodelist))
  412.         if nodelist != "" {
  413.                 for id, address := range parseNodesString(nodelist, d.config.UDPProto) {
  414.                         if id == d.nodeId {
  415.                                 log.V(5).Infof("DHT got reference of self for get_peers, id %x", id)
  416.                                 continue
  417.                         }

  418.                         // If it's in our routing table already, ignore it.
  419.                         _, addr, existed, err := d.routingTable.hostPortToNode(address, d.config.UDPProto)
  420.                         if err != nil {
  421.                                 log.V(3).Infof("DHT error parsing get peers node: %v", err)
  422.                                 continue
  423.                         }
  424.                         if addr == node.address.String() {
  425.                                 // This smartass is probably trying to
  426.                                 // sniff the network, or attract a lot
  427.                                 // of traffic to itself. Ignore all
  428.                                 // their results.
  429.                                 totalSelfPromotions.Add(1)
  430.                                 continue
  431.                         }
  432.                         if existed {
  433.                                 if log.V(4) {
  434.                                         x := hashDistance(query.ih, InfoHash(node.id))
  435.                                         log.Infof("DHT: processGetPeerResults DUPE node reference: %x@%v from %x@%v. Distance: %x.",
  436.                                                 id, address, node.id, node.address, x)
  437.                                 }
  438.                                 totalGetPeersDupes.Add(1)
  439.                         } else {
  440.                                 // And it is actually new. Interesting.
  441.                                 if log.V(4) {
  442.                                         x := hashDistance(query.ih, InfoHash(node.id))
  443.                                         log.Infof("DHT: Got new node reference: %x@%v from %x@%v. Distance: %x.",
  444.                                                 id, address, node.id, node.address, x)
  445.                                 }
  446.                                 if _, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto); err == nil && d.needMorePeers(query.ih) {
  447.                                         // Re-add this request to the queue. This would in theory
  448.                                         // batch similar requests, because new nodes are already
  449.                                         // available in the routing table and will be used at the
  450.                                         // next opportunity - before this particular channel send is
  451.                                         // processed. As soon we reach target number of peers these
  452.                                         // channel sends become noops.
  453.                                         //
  454.                                         // Setting the announce parameter to false because it's not
  455.                                         // needed here: if this node is downloading that particular
  456.                                         // infohash, that has already been recorded with
  457.                                         // peerStore.addLocalDownload(). The announcement itself is
  458.                                         // sent not when get_peers is sent, but when processing the
  459.                                         // reply to get_peers.
  460.                                         //
  461.                                         select {
  462.                                         case d.peersRequest <- ihReq{query.ih, false}:
  463.                                         default:
  464.                                                 // The channel is full, so drop this item. The node
  465.                                                 // was added to the routing table already, so it
  466.                                                 // will be used next time getPeers() is called -
  467.                                                 // assuming it's close enough to the ih.
  468.                                         }
  469.                                 }
  470.                         }
  471.                 }
  472.         }
  473. }

  474. // Process another node's response to a find_node query.
  475. func (d *DHT) processFindNodeResults(node *remoteNode, resp responseType) {
  476.         var nodelist string
  477.         totalRecvFindNodeReply.Add(1)

  478.         query, _ := node.pendingQueries[resp.T]
  479.         if d.config.UDPProto == "udp4" {
  480.                 nodelist = resp.R.Nodes
  481.         } else if d.config.UDPProto == "udp6" {
  482.                 nodelist = resp.R.Nodes6
  483.         }
  484.         log.V(5).Infof("processFindNodeResults find_node = %s len(nodelist)=%d", nettools.BinaryToDottedPort(node.addressBinaryFormat), len(nodelist))

  485.         if nodelist != "" {
  486.                 for id, address := range parseNodesString(nodelist, d.config.UDPProto) {
  487.                         _, addr, existed, err := d.routingTable.hostPortToNode(address, d.config.UDPProto)
  488.                         if err != nil {
  489.                                 log.V(3).Infof("DHT error parsing node from find_find response: %v", err)
  490.                                 continue
  491.                         }
  492.                         if id == d.nodeId {
  493.                                 log.V(5).Infof("DHT got reference of self for find_node, id %x", id)
  494.                                 continue
  495.                         }
  496.                         if addr == node.address.String() {
  497.                                 // SelfPromotions are more common for find_node. They are
  498.                                 // happening even for router.bittorrent.com
  499.                                 totalSelfPromotions.Add(1)
  500.                                 continue
  501.                         }
  502.                         if existed {
  503.                                 if log.V(4) {
  504.                                         x := hashDistance(query.ih, InfoHash(node.id))
  505.                                         log.Infof("DHT: processFindNodeResults DUPE node reference, query %x: %x@%v from %x@%v. Distance: %x.",
  506.                                                 query.ih, id, address, node.id, node.address, x)
  507.                                 }
  508.                                 totalFindNodeDupes.Add(1)
  509.                         } else {
  510.                                 if log.V(4) {
  511.                                         x := hashDistance(query.ih, InfoHash(node.id))
  512.                                         log.Infof("DHT: Got new node reference, query %x: %x@%v from %x@%v. Distance: %x.",
  513.                                                 query.ih, id, address, node.id, node.address, x)
  514.                                 }
  515.                                 // Includes the node in the routing table and ignores errors.
  516.                                 //
  517.                                 // Only continue the search if we really have to.
  518.                                 r, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto)
  519.                                 if err != nil {
  520.                                         log.Warningf("processFindNodeResults calling getOrCreateNode: %v. Id=%x, Address=%q", err, id, addr)
  521.                                         continue
  522.                                 }
  523.                                 if d.needMoreNodes() {
  524.                                         select {
  525.                                         case d.nodesRequest <- ihReq{query.ih, false}:
  526.                                         default:
  527.                                                 // Too many find_node commands queued up. Dropping
  528.                                                 // this. The node has already been added to the
  529.                                                 // routing table so we're not losing any
  530.                                                 // information.
  531.                                         }
  532.                                 }
  533.                                 d.getMorePeers(r)
  534.                         }
  535.                 }
  536.         }
  537. }

  538. func randNodeId() []byte {
  539.         b := make([]byte, 20)
  540.         if _, err := rand.Read(b); err != nil {
  541.                 log.Fatalln("nodeId rand:", err)
  542.         }
  543.         return b
  544. }

  545. var (
  546.         totalNodesReached            = expvar.NewInt("totalNodesReached")
  547.         totalGetPeersDupes           = expvar.NewInt("totalGetPeersDupes")
  548.         totalFindNodeDupes           = expvar.NewInt("totalFindNodeDupes")
  549.         totalSelfPromotions          = expvar.NewInt("totalSelfPromotions")
  550.         totalPeers                   = expvar.NewInt("totalPeers")
  551.         totalSentPing                = expvar.NewInt("totalSentPing")
  552.         totalSentGetPeers            = expvar.NewInt("totalSentGetPeers")
  553.         totalSentFindNode            = expvar.NewInt("totalSentFindNode")
  554.         totalRecvGetPeers            = expvar.NewInt("totalRecvGetPeers")
  555.         totalRecvGetPeersReply       = expvar.NewInt("totalRecvGetPeersReply")
  556.         totalRecvPingReply           = expvar.NewInt("totalRecvPingReply")
  557.         totalRecvFindNode            = expvar.NewInt("totalRecvFindNode")
  558.         totalRecvFindNodeReply       = expvar.NewInt("totalRecvFindNodeReply")
  559.         totalPacketsFromBlockedHosts = expvar.NewInt("totalPacketsFromBlockedHosts")
  560.         totalDroppedPackets          = expvar.NewInt("totalDroppedPackets")
  561.         totalRecv                    = expvar.NewInt("totalRecv")
  562. )
 楼主| keer_zu 发表于 2016-12-6 15:09 | 显示全部楼层
以上是dht的核心部分
cooldog123pp 发表于 2016-12-7 09:00 | 显示全部楼层
很牛啊  路过 看下
 楼主| keer_zu 发表于 2016-12-7 13:06 | 显示全部楼层
yyy71cj 发表于 2016-12-6 22:47
连package这样的关键字都出现了,看来以后的编程与软件工程就浑然一体了…… ...

对了,go语言体现的就是软件工程。
 楼主| keer_zu 发表于 2016-12-7 13:06 | 显示全部楼层
cooldog123pp 发表于 2016-12-7 09:00
很牛啊  路过 看下

可以了解一下
 楼主| keer_zu 发表于 2016-12-7 14:05 | 显示全部楼层
yyy71cj 发表于 2016-12-7 13:54
显然,这是一个用语法来淘汰rose的行为……

我觉得这是新一代语言的趋势。
 楼主| keer_zu 发表于 2016-12-7 17:26 | 显示全部楼层
yyy71cj 发表于 2016-12-7 15:30
显然是的。原来的软件工程工具与语言工具是两个,一起使用很是不习惯…… ...

原来的不适合敏捷开发模式,互联网时代需要快速反应,瀑布模型已经过时。
 楼主| keer_zu 发表于 2016-12-28 23:02 来自手机 | 显示全部楼层
您需要登录后才可以回帖 登录 | 注册

本版积分规则

1481

主题

12924

帖子

55

粉丝
快速回复 在线客服 返回列表 返回顶部