func (d *DHT) needMoreNodes() bool {
n := d.routingTable.numNodes()
return n < minNodes || n*2 < d.config.MaxNodes
}
func (d *DHT) needMorePeers(ih InfoHash) bool {
return d.peerStore.alive(ih) < d.config.NumTargetPeers
}
func (d *DHT) getMorePeers(r *remoteNode) {
for ih := range d.peerStore.localActiveDownloads {
if d.needMorePeers(ih) {
if r == nil {
d.getPeers(ih)
} else {
d.getPeersFrom(r, ih)
}
}
}
}
func (d *DHT) helloFromPeer(addr string) {
// We've got a new node id. We need to:
// - see if we know it already, skip accordingly.
// - ping it and see if it's reachable.
// - if it responds, save it in the routing table.
_, addrResolved, existed, err := d.routingTable.hostPortToNode(addr, d.config.UDPProto)
if err != nil {
log.Warningf("helloFromPeer error: %v", err)
return
}
if existed {
// Node host+port already known.
return
}
if d.routingTable.length() < d.config.MaxNodes {
d.ping(addrResolved)
return
}
}
func (d *DHT) processPacket(p packetType) {
log.V(5).Infof("DHT processing packet from %v", p.raddr.String())
if !d.clientThrottle.CheckBlock(p.raddr.IP.String()) {
totalPacketsFromBlockedHosts.Add(1)
log.V(5).Infof("Node exceeded rate limiter. Dropping packet.")
return
}
if p.b[0] != 'd' {
// Malformed DHT packet. There are protocol extensions out
// there that we don't support or understand.
log.V(5).Infof("Malformed DHT packet.")
return
}
r, err := readResponse(p)
if err != nil {
log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b))
return
}
switch {
// Response.
case r.Y == "r":
log.V(5).Infof("DHT processing response from %x", r.R.Id)
if bogusId(r.R.Id) {
log.V(3).Infof("DHT received packet with bogus node id %x", r.R.Id)
return
}
if r.R.Id == d.nodeId {
log.V(3).Infof("DHT received reply from self, id %x", r.A.Id)
return
}
node, addr, existed, err := d.routingTable.hostPortToNode(p.raddr.String(), d.config.UDPProto)
if err != nil {
log.V(3).Infof("DHT readResponse error processing response: %v", err)
return
}
if !existed {
log.V(3).Infof("DHT: Received reply from a host we don't know: %v", p.raddr)
if d.routingTable.length() < d.config.MaxNodes {
d.ping(addr)
}
return
}
// Fix the node ID.
if node.id == "" {
node.id = r.R.Id
d.routingTable.update(node, d.config.UDPProto)
}
if node.id != r.R.Id {
log.V(3).Infof("DHT: Node changed IDs %x => %x", node.id, r.R.Id)
}
if query, ok := node.pendingQueries[r.T]; ok {
log.V(4).Infof("DHT: Received reply to %v", query.Type)
if !node.reachable {
node.reachable = true
totalNodesReached.Add(1)
}
node.lastResponseTime = time.Now()
node.pastQueries[r.T] = query
d.routingTable.neighborhoodUpkeep(node, d.config.UDPProto, d.peerStore)
// If this is the first host added to the routing table, attempt a
// recursive lookup of our own address, to build our neighborhood ASAP.
if d.needMoreNodes() {
log.V(5).Infof("DHT: need more nodes")
d.findNode(d.nodeId)
}
d.exploredNeighborhood = true
switch query.Type {
case "ping":
// Served its purpose, nothing else to be done.
totalRecvPingReply.Add(1)
case "get_peers":
log.V(5).Infof("DHT: got get_peers response")
d.processGetPeerResults(node, r)
case "find_node":
log.V(5).Infof("DHT: got find_node response")
d.processFindNodeResults(node, r)
case "announce_peer":
// Nothing to do. In the future, update counters.
default:
log.V(3).Infof("DHT: Unknown query type: %v from %v", query.Type, addr)
}
delete(node.pendingQueries, r.T)
} else {
log.V(3).Infof("DHT: Unknown query id: %v", r.T)
}
case r.Y == "q":
if r.A.Id == d.nodeId {
log.V(3).Infof("DHT received packet from self, id %x", r.A.Id)
return
}
node, addr, existed, err := d.routingTable.hostPortToNode(p.raddr.String(), d.config.UDPProto)
if err != nil {
log.Warningf("Error readResponse error processing query: %v", err)
return
}
if !existed {
// Another candidate for the routing table. See if it's reachable.
if d.routingTable.length() < d.config.MaxNodes {
d.ping(addr)
}
}
log.V(5).Infof("DHT processing %v request", r.Q)
switch r.Q {
case "ping":
d.replyPing(p.raddr, r)
case "get_peers":
d.replyGetPeers(p.raddr, r)
case "find_node":
d.replyFindNode(p.raddr, r)
case "announce_peer":
d.replyAnnouncePeer(p.raddr, node, r)
default:
log.V(3).Infof("DHT: non-implemented handler for type %v", r.Q)
}
default:
log.V(3).Infof("DHT: Bogus DHT query from %v.", p.raddr)
}
}
func (d *DHT) ping(address string) {
r, err := d.routingTable.getOrCreateNode("", address, d.config.UDPProto)
if err != nil {
log.V(3).Infof("ping error for address %v: %v", address, err)
return
}
d.pingNode(r)
}
func (d *DHT) pingNode(r *remoteNode) {
log.V(3).Infof("DHT: ping => %+v", r.address)
t := r.newQuery("ping")
queryArguments := map[string]interface{}{"id": d.nodeId}
query := queryMessage{t, "q", "ping", queryArguments}
sendMsg(d.conn, r.address, query)
totalSentPing.Add(1)
}
func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) {
if r == nil {
return
}
totalSentGetPeers.Add(1)
ty := "get_peers"
transId := r.newQuery(ty)
if _, ok := r.pendingQueries[transId]; ok {
r.pendingQueries[transId].ih = ih
} else {
r.pendingQueries[transId] = &queryType{ih: ih}
}
queryArguments := map[string]interface{}{
"id": d.nodeId,
"info_hash": ih,
}
query := queryMessage{transId, "q", ty, queryArguments}
if log.V(3) {
x := hashDistance(InfoHash(r.id), ih)
log.V(3).Infof("DHT sending get_peers. nodeID: %x@%v, InfoHash: %x , distance: %x", r.id, r.address, ih, x)
}
r.lastSearchTime = time.Now()
sendMsg(d.conn, r.address, query)
}
func (d *DHT) findNodeFrom(r *remoteNode, id string) {
if r == nil {
return
}
totalSentFindNode.Add(1)
ty := "find_node"
transId := r.newQuery(ty)
ih := InfoHash(id)
log.V(3).Infof("findNodeFrom adding pendingQueries transId=%v ih=%x", transId, ih)
if _, ok := r.pendingQueries[transId]; ok {
r.pendingQueries[transId].ih = ih
} else {
r.pendingQueries[transId] = &queryType{ih: ih}
}
queryArguments := map[string]interface{}{
"id": d.nodeId,
"target": id,
}
query := queryMessage{transId, "q", ty, queryArguments}
if log.V(3) {
x := hashDistance(InfoHash(r.id), ih)
log.V(3).Infof("DHT sending find_node. nodeID: %x@%v, target ID: %x , distance: %x", r.id, r.address, id, x)
}
r.lastSearchTime = time.Now()
sendMsg(d.conn, r.address, query)
}
// announcePeer sends a message to the destination address to advertise that
// our node is a peer for this infohash, using the provided token to
// 'authenticate'.
func (d *DHT) announcePeer(address net.UDPAddr, ih InfoHash, token string) {
r, err := d.routingTable.getOrCreateNode("", address.String(), d.config.UDPProto)
if err != nil {
log.V(3).Infof("announcePeer error: %v", err)
return
}
ty := "announce_peer"
log.V(3).Infof("DHT: announce_peer => address: %v, ih: %x, token: %x", address, ih, token)
transId := r.newQuery(ty)
queryArguments := map[string]interface{}{
"id": d.nodeId,
"info_hash": ih,
"port": d.config.Port,
"token": token,
}
query := queryMessage{transId, "q", ty, queryArguments}
sendMsg(d.conn, address, query)
}
func (d *DHT) hostToken(addr net.UDPAddr, secret string) string {
h := sha1.New()
io.WriteString(h, addr.String())
io.WriteString(h, secret)
return fmt.Sprintf("%x", h.Sum(nil))
}
func (d *DHT) checkToken(addr net.UDPAddr, token string) bool {
match := false
for _, secret := range d.tokenSecrets {
if d.hostToken(addr, secret) == token {
match = true
break
}
}
log.V(4).Infof("checkToken for %v, %q matches? %v", addr, token, match)
return match
}
func (d *DHT) replyAnnouncePeer(addr net.UDPAddr, node *remoteNode, r responseType) {
ih := InfoHash(r.A.InfoHash)
if log.V(3) {
log.Infof("DHT: announce_peer. Host %v, nodeID: %x, infoHash: %x, peerPort %d, distance to me %x",
addr, r.A.Id, ih, r.A.Port, hashDistance(ih, InfoHash(d.nodeId)),
)
}
// node can be nil if, for example, the server just restarted and received an announce_peer
// from a node it doesn't yet know about.
if node != nil && d.checkToken(addr, r.A.Token) {
peerAddr := net.TCPAddr{IP: addr.IP, Port: r.A.Port}
d.peerStore.addContact(ih, nettools.DottedPortToBinary(peerAddr.String()))
// Allow searching this node immediately, since it's telling us
// it has an infohash. Enables faster upgrade of other nodes to
// "peer" of an infohash, if the announcement is valid.
node.lastResponseTime = time.Now().Add(-searchRetryPeriod)
if d.peerStore.hasLocalDownload(ih) {
d.PeersRequestResults <- map[InfoHash][]string{ih: []string{nettools.DottedPortToBinary(peerAddr.String())}}
}
}
// Always reply positively. jech says this is to avoid "back-tracking", not sure what that means.
reply := replyMessage{
T: r.T,
Y: "r",
R: map[string]interface{}{"id": d.nodeId},
}
sendMsg(d.conn, addr, reply)
}
func (d *DHT) replyGetPeers(addr net.UDPAddr, r responseType) {
totalRecvGetPeers.Add(1)
if log.V(3) {
log.Infof("DHT get_peers. Host: %v , nodeID: %x , InfoHash: %x , distance to me: %x",
addr, r.A.Id, InfoHash(r.A.InfoHash), hashDistance(r.A.InfoHash, InfoHash(d.nodeId)))
}
if d.Logger != nil {
d.Logger.GetPeers(addr, r.A.Id, r.A.InfoHash)
}
ih := r.A.InfoHash
r0 := map[string]interface{}{"id": d.nodeId, "token": d.hostToken(addr, d.tokenSecrets[0])}
reply := replyMessage{
T: r.T,
Y: "r",
R: r0,
}
if peerContacts := d.peersForInfoHash(ih); len(peerContacts) > 0 {
reply.R["values"] = peerContacts
} else {
reply.R["nodes"] = d.nodesForInfoHash(ih)
}
sendMsg(d.conn, addr, reply)
}
func (d *DHT) nodesForInfoHash(ih InfoHash) string {
n := make([]string, 0, kNodes)
for _, r := range d.routingTable.lookup(ih) {
// r is nil when the node was filtered.
if r != nil {
binaryHost := r.id + nettools.DottedPortToBinary(r.address.String())
if binaryHost == "" {
log.V(3).Infof("killing node with bogus address %v", r.address.String())
d.routingTable.kill(r, d.peerStore)
} else {
n = append(n, binaryHost)
}
}
}
log.V(3).Infof("replyGetPeers: Nodes only. Giving %d", len(n))
return strings.Join(n, "")
}
func (d *DHT) peersForInfoHash(ih InfoHash) []string {
peerContacts := d.peerStore.peerContacts(ih)
if len(peerContacts) > 0 {
log.V(3).Infof("replyGetPeers: Giving peers! %x was requested, and we knew %d peers!", ih, len(peerContacts))
}
return peerContacts
}
func (d *DHT) replyFindNode(addr net.UDPAddr, r responseType) {
totalRecvFindNode.Add(1)
if log.V(3) {
x := hashDistance(InfoHash(r.A.Target), InfoHash(d.nodeId))
log.Infof("DHT find_node. Host: %v , nodeId: %x , target ID: %x , distance to me: %x",
addr, r.A.Id, r.A.Target, x)
}
node := InfoHash(r.A.Target)
r0 := map[string]interface{}{"id": d.nodeId}
reply := replyMessage{
T: r.T,
Y: "r",
R: r0,
}
neighbors := d.routingTable.lookupFiltered(node)
if len(neighbors) < kNodes {
neighbors = append(neighbors, d.routingTable.lookup(node)...)
}
n := make([]string, 0, kNodes)
for _, r := range neighbors {
n = append(n, r.id+r.addressBinaryFormat)
if len(n) == kNodes {
break
}
}
log.V(3).Infof("replyFindNode: Nodes only. Giving %d", len(n))
reply.R["nodes"] = strings.Join(n, "")
sendMsg(d.conn, addr, reply)
}
func (d *DHT) replyPing(addr net.UDPAddr, response responseType) {
log.V(3).Infof("DHT: reply ping => %v", addr)
reply := replyMessage{
T: response.T,
Y: "r",
R: map[string]interface{}{"id": d.nodeId},
}
sendMsg(d.conn, addr, reply)
}
// Process another node's response to a get_peers query. If the response
// contains peers, send them to the Torrent engine, our client, using the
// DHT.PeersRequestResults channel. If it contains closest nodes, query
// them if we still need it. Also announce ourselves as a peer for that node,
// unless we are in supernode mode.
func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType) {
totalRecvGetPeersReply.Add(1)
query, _ := node.pendingQueries[resp.T]
if d.peerStore.hasLocalDownload(query.ih) {
d.announcePeer(node.address, query.ih, resp.R.Token)
}
if resp.R.Values != nil {
peers := make([]string, 0)
for _, peerContact := range resp.R.Values {
// send peer even if we already have it in store
// the underlying client does/should handle dupes
d.peerStore.addContact(query.ih, peerContact)
peers = append(peers, peerContact)
}
if len(peers) > 0 {
// Finally, new peers.
result := map[InfoHash][]string{query.ih: peers}
totalPeers.Add(int64(len(peers)))
log.V(2).Infof("DHT: processGetPeerResults, totalPeers: %v", totalPeers.String())
select {
case d.PeersRequestResults <- result:
case <-d.stop:
// if we're closing down and the caller has stopped reading
// from PeersRequestResults, drop the result.
}
}
}
var nodelist string
if d.config.UDPProto == "udp4" {
nodelist = resp.R.Nodes
} else if d.config.UDPProto == "udp6" {
nodelist = resp.R.Nodes6
}
log.V(5).Infof("DHT: handling get_peers results len(nodelist)=%d", len(nodelist))
if nodelist != "" {
for id, address := range parseNodesString(nodelist, d.config.UDPProto) {
if id == d.nodeId {
log.V(5).Infof("DHT got reference of self for get_peers, id %x", id)
continue
}
// If it's in our routing table already, ignore it.
_, addr, existed, err := d.routingTable.hostPortToNode(address, d.config.UDPProto)
if err != nil {
log.V(3).Infof("DHT error parsing get peers node: %v", err)
continue
}
if addr == node.address.String() {
// This smartass is probably trying to
// sniff the network, or attract a lot
// of traffic to itself. Ignore all
// their results.
totalSelfPromotions.Add(1)
continue
}
if existed {
if log.V(4) {
x := hashDistance(query.ih, InfoHash(node.id))
log.Infof("DHT: processGetPeerResults DUPE node reference: %x@%v from %x@%v. Distance: %x.",
id, address, node.id, node.address, x)
}
totalGetPeersDupes.Add(1)
} else {
// And it is actually new. Interesting.
if log.V(4) {
x := hashDistance(query.ih, InfoHash(node.id))
log.Infof("DHT: Got new node reference: %x@%v from %x@%v. Distance: %x.",
id, address, node.id, node.address, x)
}
if _, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto); err == nil && d.needMorePeers(query.ih) {
// Re-add this request to the queue. This would in theory
// batch similar requests, because new nodes are already
// available in the routing table and will be used at the
// next opportunity - before this particular channel send is
// processed. As soon we reach target number of peers these
// channel sends become noops.
//
// Setting the announce parameter to false because it's not
// needed here: if this node is downloading that particular
// infohash, that has already been recorded with
// peerStore.addLocalDownload(). The announcement itself is
// sent not when get_peers is sent, but when processing the
// reply to get_peers.
//
select {
case d.peersRequest <- ihReq{query.ih, false}:
default:
// The channel is full, so drop this item. The node
// was added to the routing table already, so it
// will be used next time getPeers() is called -
// assuming it's close enough to the ih.
}
}
}
}
}
}
// Process another node's response to a find_node query.
func (d *DHT) processFindNodeResults(node *remoteNode, resp responseType) {
var nodelist string
totalRecvFindNodeReply.Add(1)
query, _ := node.pendingQueries[resp.T]
if d.config.UDPProto == "udp4" {
nodelist = resp.R.Nodes
} else if d.config.UDPProto == "udp6" {
nodelist = resp.R.Nodes6
}
log.V(5).Infof("processFindNodeResults find_node = %s len(nodelist)=%d", nettools.BinaryToDottedPort(node.addressBinaryFormat), len(nodelist))
if nodelist != "" {
for id, address := range parseNodesString(nodelist, d.config.UDPProto) {
_, addr, existed, err := d.routingTable.hostPortToNode(address, d.config.UDPProto)
if err != nil {
log.V(3).Infof("DHT error parsing node from find_find response: %v", err)
continue
}
if id == d.nodeId {
log.V(5).Infof("DHT got reference of self for find_node, id %x", id)
continue
}
if addr == node.address.String() {
// SelfPromotions are more common for find_node. They are
// happening even for router.bittorrent.com
totalSelfPromotions.Add(1)
continue
}
if existed {
if log.V(4) {
x := hashDistance(query.ih, InfoHash(node.id))
log.Infof("DHT: processFindNodeResults DUPE node reference, query %x: %x@%v from %x@%v. Distance: %x.",
query.ih, id, address, node.id, node.address, x)
}
totalFindNodeDupes.Add(1)
} else {
if log.V(4) {
x := hashDistance(query.ih, InfoHash(node.id))
log.Infof("DHT: Got new node reference, query %x: %x@%v from %x@%v. Distance: %x.",
query.ih, id, address, node.id, node.address, x)
}
// Includes the node in the routing table and ignores errors.
//
// Only continue the search if we really have to.
r, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto)
if err != nil {
log.Warningf("processFindNodeResults calling getOrCreateNode: %v. Id=%x, Address=%q", err, id, addr)
continue
}
if d.needMoreNodes() {
select {
case d.nodesRequest <- ihReq{query.ih, false}:
default:
// Too many find_node commands queued up. Dropping
// this. The node has already been added to the
// routing table so we're not losing any
// information.
}
}
d.getMorePeers(r)
}
}
}
}
func randNodeId() []byte {
b := make([]byte, 20)
if _, err := rand.Read(b); err != nil {
log.Fatalln("nodeId rand:", err)
}
return b
}
var (
totalNodesReached = expvar.NewInt("totalNodesReached")
totalGetPeersDupes = expvar.NewInt("totalGetPeersDupes")
totalFindNodeDupes = expvar.NewInt("totalFindNodeDupes")
totalSelfPromotions = expvar.NewInt("totalSelfPromotions")
totalPeers = expvar.NewInt("totalPeers")
totalSentPing = expvar.NewInt("totalSentPing")
totalSentGetPeers = expvar.NewInt("totalSentGetPeers")
totalSentFindNode = expvar.NewInt("totalSentFindNode")
totalRecvGetPeers = expvar.NewInt("totalRecvGetPeers")
totalRecvGetPeersReply = expvar.NewInt("totalRecvGetPeersReply")
totalRecvPingReply = expvar.NewInt("totalRecvPingReply")
totalRecvFindNode = expvar.NewInt("totalRecvFindNode")
totalRecvFindNodeReply = expvar.NewInt("totalRecvFindNodeReply")
totalPacketsFromBlockedHosts = expvar.NewInt("totalPacketsFromBlockedHosts")
totalDroppedPackets = expvar.NewInt("totalDroppedPackets")
totalRecv = expvar.NewInt("totalRecv")
)