- func (d *DHT) needMoreNodes() bool {
- n := d.routingTable.numNodes()
- return n < minNodes || n*2 < d.config.MaxNodes
- }
- func (d *DHT) needMorePeers(ih InfoHash) bool {
- return d.peerStore.alive(ih) < d.config.NumTargetPeers
- }
- func (d *DHT) getMorePeers(r *remoteNode) {
- for ih := range d.peerStore.localActiveDownloads {
- if d.needMorePeers(ih) {
- if r == nil {
- d.getPeers(ih)
- } else {
- d.getPeersFrom(r, ih)
- }
- }
- }
- }
- func (d *DHT) helloFromPeer(addr string) {
- // We've got a new node id. We need to:
- // - see if we know it already, skip accordingly.
- // - ping it and see if it's reachable.
- // - if it responds, save it in the routing table.
- _, addrResolved, existed, err := d.routingTable.hostPortToNode(addr, d.config.UDPProto)
- if err != nil {
- log.Warningf("helloFromPeer error: %v", err)
- return
- }
- if existed {
- // Node host+port already known.
- return
- }
- if d.routingTable.length() < d.config.MaxNodes {
- d.ping(addrResolved)
- return
- }
- }
- func (d *DHT) processPacket(p packetType) {
- log.V(5).Infof("DHT processing packet from %v", p.raddr.String())
- if !d.clientThrottle.CheckBlock(p.raddr.IP.String()) {
- totalPacketsFromBlockedHosts.Add(1)
- log.V(5).Infof("Node exceeded rate limiter. Dropping packet.")
- return
- }
- if p.b[0] != 'd' {
- // Malformed DHT packet. There are protocol extensions out
- // there that we don't support or understand.
- log.V(5).Infof("Malformed DHT packet.")
- return
- }
- r, err := readResponse(p)
- if err != nil {
- log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b))
- return
- }
- switch {
- // Response.
- case r.Y == "r":
- log.V(5).Infof("DHT processing response from %x", r.R.Id)
- if bogusId(r.R.Id) {
- log.V(3).Infof("DHT received packet with bogus node id %x", r.R.Id)
- return
- }
- if r.R.Id == d.nodeId {
- log.V(3).Infof("DHT received reply from self, id %x", r.A.Id)
- return
- }
- node, addr, existed, err := d.routingTable.hostPortToNode(p.raddr.String(), d.config.UDPProto)
- if err != nil {
- log.V(3).Infof("DHT readResponse error processing response: %v", err)
- return
- }
- if !existed {
- log.V(3).Infof("DHT: Received reply from a host we don't know: %v", p.raddr)
- if d.routingTable.length() < d.config.MaxNodes {
- d.ping(addr)
- }
- return
- }
- // Fix the node ID.
- if node.id == "" {
- node.id = r.R.Id
- d.routingTable.update(node, d.config.UDPProto)
- }
- if node.id != r.R.Id {
- log.V(3).Infof("DHT: Node changed IDs %x => %x", node.id, r.R.Id)
- }
- if query, ok := node.pendingQueries[r.T]; ok {
- log.V(4).Infof("DHT: Received reply to %v", query.Type)
- if !node.reachable {
- node.reachable = true
- totalNodesReached.Add(1)
- }
- node.lastResponseTime = time.Now()
- node.pastQueries[r.T] = query
- d.routingTable.neighborhoodUpkeep(node, d.config.UDPProto, d.peerStore)
- // If this is the first host added to the routing table, attempt a
- // recursive lookup of our own address, to build our neighborhood ASAP.
- if d.needMoreNodes() {
- log.V(5).Infof("DHT: need more nodes")
- d.findNode(d.nodeId)
- }
- d.exploredNeighborhood = true
- switch query.Type {
- case "ping":
- // Served its purpose, nothing else to be done.
- totalRecvPingReply.Add(1)
- case "get_peers":
- log.V(5).Infof("DHT: got get_peers response")
- d.processGetPeerResults(node, r)
- case "find_node":
- log.V(5).Infof("DHT: got find_node response")
- d.processFindNodeResults(node, r)
- case "announce_peer":
- // Nothing to do. In the future, update counters.
- default:
- log.V(3).Infof("DHT: Unknown query type: %v from %v", query.Type, addr)
- }
- delete(node.pendingQueries, r.T)
- } else {
- log.V(3).Infof("DHT: Unknown query id: %v", r.T)
- }
- case r.Y == "q":
- if r.A.Id == d.nodeId {
- log.V(3).Infof("DHT received packet from self, id %x", r.A.Id)
- return
- }
- node, addr, existed, err := d.routingTable.hostPortToNode(p.raddr.String(), d.config.UDPProto)
- if err != nil {
- log.Warningf("Error readResponse error processing query: %v", err)
- return
- }
- if !existed {
- // Another candidate for the routing table. See if it's reachable.
- if d.routingTable.length() < d.config.MaxNodes {
- d.ping(addr)
- }
- }
- log.V(5).Infof("DHT processing %v request", r.Q)
- switch r.Q {
- case "ping":
- d.replyPing(p.raddr, r)
- case "get_peers":
- d.replyGetPeers(p.raddr, r)
- case "find_node":
- d.replyFindNode(p.raddr, r)
- case "announce_peer":
- d.replyAnnouncePeer(p.raddr, node, r)
- default:
- log.V(3).Infof("DHT: non-implemented handler for type %v", r.Q)
- }
- default:
- log.V(3).Infof("DHT: Bogus DHT query from %v.", p.raddr)
- }
- }
- func (d *DHT) ping(address string) {
- r, err := d.routingTable.getOrCreateNode("", address, d.config.UDPProto)
- if err != nil {
- log.V(3).Infof("ping error for address %v: %v", address, err)
- return
- }
- d.pingNode(r)
- }
- func (d *DHT) pingNode(r *remoteNode) {
- log.V(3).Infof("DHT: ping => %+v", r.address)
- t := r.newQuery("ping")
- queryArguments := map[string]interface{}{"id": d.nodeId}
- query := queryMessage{t, "q", "ping", queryArguments}
- sendMsg(d.conn, r.address, query)
- totalSentPing.Add(1)
- }
- func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) {
- if r == nil {
- return
- }
- totalSentGetPeers.Add(1)
- ty := "get_peers"
- transId := r.newQuery(ty)
- if _, ok := r.pendingQueries[transId]; ok {
- r.pendingQueries[transId].ih = ih
- } else {
- r.pendingQueries[transId] = &queryType{ih: ih}
- }
- queryArguments := map[string]interface{}{
- "id": d.nodeId,
- "info_hash": ih,
- }
- query := queryMessage{transId, "q", ty, queryArguments}
- if log.V(3) {
- x := hashDistance(InfoHash(r.id), ih)
- log.V(3).Infof("DHT sending get_peers. nodeID: %x@%v, InfoHash: %x , distance: %x", r.id, r.address, ih, x)
- }
- r.lastSearchTime = time.Now()
- sendMsg(d.conn, r.address, query)
- }
- func (d *DHT) findNodeFrom(r *remoteNode, id string) {
- if r == nil {
- return
- }
- totalSentFindNode.Add(1)
- ty := "find_node"
- transId := r.newQuery(ty)
- ih := InfoHash(id)
- log.V(3).Infof("findNodeFrom adding pendingQueries transId=%v ih=%x", transId, ih)
- if _, ok := r.pendingQueries[transId]; ok {
- r.pendingQueries[transId].ih = ih
- } else {
- r.pendingQueries[transId] = &queryType{ih: ih}
- }
- queryArguments := map[string]interface{}{
- "id": d.nodeId,
- "target": id,
- }
- query := queryMessage{transId, "q", ty, queryArguments}
- if log.V(3) {
- x := hashDistance(InfoHash(r.id), ih)
- log.V(3).Infof("DHT sending find_node. nodeID: %x@%v, target ID: %x , distance: %x", r.id, r.address, id, x)
- }
- r.lastSearchTime = time.Now()
- sendMsg(d.conn, r.address, query)
- }
- // announcePeer sends a message to the destination address to advertise that
- // our node is a peer for this infohash, using the provided token to
- // 'authenticate'.
- func (d *DHT) announcePeer(address net.UDPAddr, ih InfoHash, token string) {
- r, err := d.routingTable.getOrCreateNode("", address.String(), d.config.UDPProto)
- if err != nil {
- log.V(3).Infof("announcePeer error: %v", err)
- return
- }
- ty := "announce_peer"
- log.V(3).Infof("DHT: announce_peer => address: %v, ih: %x, token: %x", address, ih, token)
- transId := r.newQuery(ty)
- queryArguments := map[string]interface{}{
- "id": d.nodeId,
- "info_hash": ih,
- "port": d.config.Port,
- "token": token,
- }
- query := queryMessage{transId, "q", ty, queryArguments}
- sendMsg(d.conn, address, query)
- }
- func (d *DHT) hostToken(addr net.UDPAddr, secret string) string {
- h := sha1.New()
- io.WriteString(h, addr.String())
- io.WriteString(h, secret)
- return fmt.Sprintf("%x", h.Sum(nil))
- }
- func (d *DHT) checkToken(addr net.UDPAddr, token string) bool {
- match := false
- for _, secret := range d.tokenSecrets {
- if d.hostToken(addr, secret) == token {
- match = true
- break
- }
- }
- log.V(4).Infof("checkToken for %v, %q matches? %v", addr, token, match)
- return match
- }
- func (d *DHT) replyAnnouncePeer(addr net.UDPAddr, node *remoteNode, r responseType) {
- ih := InfoHash(r.A.InfoHash)
- if log.V(3) {
- log.Infof("DHT: announce_peer. Host %v, nodeID: %x, infoHash: %x, peerPort %d, distance to me %x",
- addr, r.A.Id, ih, r.A.Port, hashDistance(ih, InfoHash(d.nodeId)),
- )
- }
- // node can be nil if, for example, the server just restarted and received an announce_peer
- // from a node it doesn't yet know about.
- if node != nil && d.checkToken(addr, r.A.Token) {
- peerAddr := net.TCPAddr{IP: addr.IP, Port: r.A.Port}
- d.peerStore.addContact(ih, nettools.DottedPortToBinary(peerAddr.String()))
- // Allow searching this node immediately, since it's telling us
- // it has an infohash. Enables faster upgrade of other nodes to
- // "peer" of an infohash, if the announcement is valid.
- node.lastResponseTime = time.Now().Add(-searchRetryPeriod)
- if d.peerStore.hasLocalDownload(ih) {
- d.PeersRequestResults <- map[InfoHash][]string{ih: []string{nettools.DottedPortToBinary(peerAddr.String())}}
- }
- }
- // Always reply positively. jech says this is to avoid "back-tracking", not sure what that means.
- reply := replyMessage{
- T: r.T,
- Y: "r",
- R: map[string]interface{}{"id": d.nodeId},
- }
- sendMsg(d.conn, addr, reply)
- }
- func (d *DHT) replyGetPeers(addr net.UDPAddr, r responseType) {
- totalRecvGetPeers.Add(1)
- if log.V(3) {
- log.Infof("DHT get_peers. Host: %v , nodeID: %x , InfoHash: %x , distance to me: %x",
- addr, r.A.Id, InfoHash(r.A.InfoHash), hashDistance(r.A.InfoHash, InfoHash(d.nodeId)))
- }
- if d.Logger != nil {
- d.Logger.GetPeers(addr, r.A.Id, r.A.InfoHash)
- }
- ih := r.A.InfoHash
- r0 := map[string]interface{}{"id": d.nodeId, "token": d.hostToken(addr, d.tokenSecrets[0])}
- reply := replyMessage{
- T: r.T,
- Y: "r",
- R: r0,
- }
- if peerContacts := d.peersForInfoHash(ih); len(peerContacts) > 0 {
- reply.R["values"] = peerContacts
- } else {
- reply.R["nodes"] = d.nodesForInfoHash(ih)
- }
- sendMsg(d.conn, addr, reply)
- }
- func (d *DHT) nodesForInfoHash(ih InfoHash) string {
- n := make([]string, 0, kNodes)
- for _, r := range d.routingTable.lookup(ih) {
- // r is nil when the node was filtered.
- if r != nil {
- binaryHost := r.id + nettools.DottedPortToBinary(r.address.String())
- if binaryHost == "" {
- log.V(3).Infof("killing node with bogus address %v", r.address.String())
- d.routingTable.kill(r, d.peerStore)
- } else {
- n = append(n, binaryHost)
- }
- }
- }
- log.V(3).Infof("replyGetPeers: Nodes only. Giving %d", len(n))
- return strings.Join(n, "")
- }
- func (d *DHT) peersForInfoHash(ih InfoHash) []string {
- peerContacts := d.peerStore.peerContacts(ih)
- if len(peerContacts) > 0 {
- log.V(3).Infof("replyGetPeers: Giving peers! %x was requested, and we knew %d peers!", ih, len(peerContacts))
- }
- return peerContacts
- }
- func (d *DHT) replyFindNode(addr net.UDPAddr, r responseType) {
- totalRecvFindNode.Add(1)
- if log.V(3) {
- x := hashDistance(InfoHash(r.A.Target), InfoHash(d.nodeId))
- log.Infof("DHT find_node. Host: %v , nodeId: %x , target ID: %x , distance to me: %x",
- addr, r.A.Id, r.A.Target, x)
- }
- node := InfoHash(r.A.Target)
- r0 := map[string]interface{}{"id": d.nodeId}
- reply := replyMessage{
- T: r.T,
- Y: "r",
- R: r0,
- }
- neighbors := d.routingTable.lookupFiltered(node)
- if len(neighbors) < kNodes {
- neighbors = append(neighbors, d.routingTable.lookup(node)...)
- }
- n := make([]string, 0, kNodes)
- for _, r := range neighbors {
- n = append(n, r.id+r.addressBinaryFormat)
- if len(n) == kNodes {
- break
- }
- }
- log.V(3).Infof("replyFindNode: Nodes only. Giving %d", len(n))
- reply.R["nodes"] = strings.Join(n, "")
- sendMsg(d.conn, addr, reply)
- }
- func (d *DHT) replyPing(addr net.UDPAddr, response responseType) {
- log.V(3).Infof("DHT: reply ping => %v", addr)
- reply := replyMessage{
- T: response.T,
- Y: "r",
- R: map[string]interface{}{"id": d.nodeId},
- }
- sendMsg(d.conn, addr, reply)
- }
- // Process another node's response to a get_peers query. If the response
- // contains peers, send them to the Torrent engine, our client, using the
- // DHT.PeersRequestResults channel. If it contains closest nodes, query
- // them if we still need it. Also announce ourselves as a peer for that node,
- // unless we are in supernode mode.
- func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType) {
- totalRecvGetPeersReply.Add(1)
- query, _ := node.pendingQueries[resp.T]
- if d.peerStore.hasLocalDownload(query.ih) {
- d.announcePeer(node.address, query.ih, resp.R.Token)
- }
- if resp.R.Values != nil {
- peers := make([]string, 0)
- for _, peerContact := range resp.R.Values {
- // send peer even if we already have it in store
- // the underlying client does/should handle dupes
- d.peerStore.addContact(query.ih, peerContact)
- peers = append(peers, peerContact)
- }
- if len(peers) > 0 {
- // Finally, new peers.
- result := map[InfoHash][]string{query.ih: peers}
- totalPeers.Add(int64(len(peers)))
- log.V(2).Infof("DHT: processGetPeerResults, totalPeers: %v", totalPeers.String())
- select {
- case d.PeersRequestResults <- result:
- case <-d.stop:
- // if we're closing down and the caller has stopped reading
- // from PeersRequestResults, drop the result.
- }
- }
- }
- var nodelist string
- if d.config.UDPProto == "udp4" {
- nodelist = resp.R.Nodes
- } else if d.config.UDPProto == "udp6" {
- nodelist = resp.R.Nodes6
- }
- log.V(5).Infof("DHT: handling get_peers results len(nodelist)=%d", len(nodelist))
- if nodelist != "" {
- for id, address := range parseNodesString(nodelist, d.config.UDPProto) {
- if id == d.nodeId {
- log.V(5).Infof("DHT got reference of self for get_peers, id %x", id)
- continue
- }
- // If it's in our routing table already, ignore it.
- _, addr, existed, err := d.routingTable.hostPortToNode(address, d.config.UDPProto)
- if err != nil {
- log.V(3).Infof("DHT error parsing get peers node: %v", err)
- continue
- }
- if addr == node.address.String() {
- // This smartass is probably trying to
- // sniff the network, or attract a lot
- // of traffic to itself. Ignore all
- // their results.
- totalSelfPromotions.Add(1)
- continue
- }
- if existed {
- if log.V(4) {
- x := hashDistance(query.ih, InfoHash(node.id))
- log.Infof("DHT: processGetPeerResults DUPE node reference: %x@%v from %x@%v. Distance: %x.",
- id, address, node.id, node.address, x)
- }
- totalGetPeersDupes.Add(1)
- } else {
- // And it is actually new. Interesting.
- if log.V(4) {
- x := hashDistance(query.ih, InfoHash(node.id))
- log.Infof("DHT: Got new node reference: %x@%v from %x@%v. Distance: %x.",
- id, address, node.id, node.address, x)
- }
- if _, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto); err == nil && d.needMorePeers(query.ih) {
- // Re-add this request to the queue. This would in theory
- // batch similar requests, because new nodes are already
- // available in the routing table and will be used at the
- // next opportunity - before this particular channel send is
- // processed. As soon we reach target number of peers these
- // channel sends become noops.
- //
- // Setting the announce parameter to false because it's not
- // needed here: if this node is downloading that particular
- // infohash, that has already been recorded with
- // peerStore.addLocalDownload(). The announcement itself is
- // sent not when get_peers is sent, but when processing the
- // reply to get_peers.
- //
- select {
- case d.peersRequest <- ihReq{query.ih, false}:
- default:
- // The channel is full, so drop this item. The node
- // was added to the routing table already, so it
- // will be used next time getPeers() is called -
- // assuming it's close enough to the ih.
- }
- }
- }
- }
- }
- }
- // Process another node's response to a find_node query.
- func (d *DHT) processFindNodeResults(node *remoteNode, resp responseType) {
- var nodelist string
- totalRecvFindNodeReply.Add(1)
- query, _ := node.pendingQueries[resp.T]
- if d.config.UDPProto == "udp4" {
- nodelist = resp.R.Nodes
- } else if d.config.UDPProto == "udp6" {
- nodelist = resp.R.Nodes6
- }
- log.V(5).Infof("processFindNodeResults find_node = %s len(nodelist)=%d", nettools.BinaryToDottedPort(node.addressBinaryFormat), len(nodelist))
- if nodelist != "" {
- for id, address := range parseNodesString(nodelist, d.config.UDPProto) {
- _, addr, existed, err := d.routingTable.hostPortToNode(address, d.config.UDPProto)
- if err != nil {
- log.V(3).Infof("DHT error parsing node from find_find response: %v", err)
- continue
- }
- if id == d.nodeId {
- log.V(5).Infof("DHT got reference of self for find_node, id %x", id)
- continue
- }
- if addr == node.address.String() {
- // SelfPromotions are more common for find_node. They are
- // happening even for router.bittorrent.com
- totalSelfPromotions.Add(1)
- continue
- }
- if existed {
- if log.V(4) {
- x := hashDistance(query.ih, InfoHash(node.id))
- log.Infof("DHT: processFindNodeResults DUPE node reference, query %x: %x@%v from %x@%v. Distance: %x.",
- query.ih, id, address, node.id, node.address, x)
- }
- totalFindNodeDupes.Add(1)
- } else {
- if log.V(4) {
- x := hashDistance(query.ih, InfoHash(node.id))
- log.Infof("DHT: Got new node reference, query %x: %x@%v from %x@%v. Distance: %x.",
- query.ih, id, address, node.id, node.address, x)
- }
- // Includes the node in the routing table and ignores errors.
- //
- // Only continue the search if we really have to.
- r, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto)
- if err != nil {
- log.Warningf("processFindNodeResults calling getOrCreateNode: %v. Id=%x, Address=%q", err, id, addr)
- continue
- }
- if d.needMoreNodes() {
- select {
- case d.nodesRequest <- ihReq{query.ih, false}:
- default:
- // Too many find_node commands queued up. Dropping
- // this. The node has already been added to the
- // routing table so we're not losing any
- // information.
- }
- }
- d.getMorePeers(r)
- }
- }
- }
- }
- func randNodeId() []byte {
- b := make([]byte, 20)
- if _, err := rand.Read(b); err != nil {
- log.Fatalln("nodeId rand:", err)
- }
- return b
- }
- var (
- totalNodesReached = expvar.NewInt("totalNodesReached")
- totalGetPeersDupes = expvar.NewInt("totalGetPeersDupes")
- totalFindNodeDupes = expvar.NewInt("totalFindNodeDupes")
- totalSelfPromotions = expvar.NewInt("totalSelfPromotions")
- totalPeers = expvar.NewInt("totalPeers")
- totalSentPing = expvar.NewInt("totalSentPing")
- totalSentGetPeers = expvar.NewInt("totalSentGetPeers")
- totalSentFindNode = expvar.NewInt("totalSentFindNode")
- totalRecvGetPeers = expvar.NewInt("totalRecvGetPeers")
- totalRecvGetPeersReply = expvar.NewInt("totalRecvGetPeersReply")
- totalRecvPingReply = expvar.NewInt("totalRecvPingReply")
- totalRecvFindNode = expvar.NewInt("totalRecvFindNode")
- totalRecvFindNodeReply = expvar.NewInt("totalRecvFindNodeReply")
- totalPacketsFromBlockedHosts = expvar.NewInt("totalPacketsFromBlockedHosts")
- totalDroppedPackets = expvar.NewInt("totalDroppedPackets")
- totalRecv = expvar.NewInt("totalRecv")
- )