Skip to content

Commit

Permalink
p2p/discover: improve test reliability
Browse files Browse the repository at this point in the history
  • Loading branch information
fjl committed May 3, 2024
1 parent 051ca13 commit 9faae76
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 12 deletions.
44 changes: 35 additions & 9 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,48 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding
<-tab.initDone

// Fill up the sender's bucket.
tab.mutex.Lock()
replacementNodeKey, _ := crypto.HexToECDSA("45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8")
replacementNode := wrapNode(enode.NewV4(&replacementNodeKey.PublicKey, net.IP{127, 0, 0, 1}, 99, 99))
last := fillBucket(tab, replacementNode)
last := fillBucket(tab, replacementNode.ID())
nodeEvents := newNodeEventRecorder(128)
tab.nodeAddedHook = nodeEvents.nodeAdded
tab.nodeRemovedHook = nodeEvents.nodeRemoved
tab.mutex.Unlock()

// Add the sender as if it just pinged us. The revalidation process should replace
// The revalidation process should replace
// this node in the bucket if it is unresponsive.
transport.dead[last.ID()] = !lastInBucketIsResponding
transport.dead[replacementNode.ID()] = !newNodeIsResponding

// Add replacement node to table.
tab.addFoundNode(replacementNode)

t.Log("last:", last.ID())
t.Log("replacement:", replacementNode.ID())

// Wait until the last node was pinged.
waitForRevalidationPing(t, transport, tab, last.ID())

// If a replacement is expected, we also need to wait until the replacement node was pinged.
if !lastInBucketIsResponding {
if !nodeEvents.waitNodeAbsent(last.ID(), 2*time.Second) {
t.Error("last node was not removed")
}
if !nodeEvents.waitNodePresent(replacementNode.ID(), 2*time.Second) {
t.Error("replacement node was not added")
}

// If a replacement is expected, we also need to wait until the replacement node
// was pinged and added/removed.
waitForRevalidationPing(t, transport, tab, replacementNode.ID())
if !newNodeIsResponding {
if !nodeEvents.waitNodeAbsent(replacementNode.ID(), 2*time.Second) {
t.Error("replacement node was not removed")
}
}
}

// Check bucket content.
tab.mutex.Lock()
defer tab.mutex.Unlock()
wantSize := bucketSize
Expand All @@ -89,26 +113,28 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding
}
bucket := tab.bucket(replacementNode.ID())
if l := len(bucket.entries); l != wantSize {
t.Errorf("wrong bucket size after bond: got %d, want %d", l, wantSize)
t.Errorf("wrong bucket size after revalidation: got %d, want %d", l, wantSize)
}
if ok := contains(bucket.entries, last.ID()); ok != lastInBucketIsResponding {
t.Errorf("last entry found: %t, want: %t", ok, lastInBucketIsResponding)
t.Errorf("revalidated node found: %t, want: %t", ok, lastInBucketIsResponding)
}
wantNewEntry := newNodeIsResponding && !lastInBucketIsResponding
if ok := contains(bucket.entries, replacementNode.ID()); ok != wantNewEntry {
t.Errorf("new entry found: %t, want: %t", ok, wantNewEntry)
t.Errorf("replacement node found: %t, want: %t", ok, wantNewEntry)
}
}

// waitForRevalidationPing waits until a PING message is sent to a node with the given id.
func waitForRevalidationPing(t *testing.T, transport *pingRecorder, tab *Table, id enode.ID) *enode.Node {
t.Helper()

simclock := tab.cfg.Clock.(*mclock.Simulated)
maxAttempts := tab.len() * 5
maxAttempts := tab.len() * 8
for i := 0; i < maxAttempts; i++ {
simclock.Run(tab.cfg.PingInterval)
p := transport.waitPing(500 * time.Millisecond)
p := transport.waitPing(2 * time.Second)
if p == nil {
t.Fatal("Table did not send any revalidation ping")
t.Fatal("Table did not send revalidation ping")
}
if id == (enode.ID{}) || p.ID() == id {
return p
Expand Down
60 changes: 57 additions & 3 deletions p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func intIP(i int) net.IP {
}

// fillBucket inserts nodes into the given bucket until it is full.
func fillBucket(tab *Table, n *node) (last *node) {
ld := enode.LogDist(tab.self().ID(), n.ID())
b := tab.bucket(n.ID())
func fillBucket(tab *Table, id enode.ID) (last *node) {
ld := enode.LogDist(tab.self().ID(), id)
b := tab.bucket(id)
for len(b.entries) < bucketSize {
node := nodeAtDistance(tab.self().ID(), ld, intIP(ld))
b.entries = append(b.entries, node)
Expand Down Expand Up @@ -291,3 +291,57 @@ func hexEncPubkey(h string) (ret encPubkey) {
copy(ret[:], b)
return ret
}

type nodeEventRecorder struct {
evc chan recordedNodeEvent
}

type recordedNodeEvent struct {
node *node
added bool
}

func newNodeEventRecorder(buffer int) *nodeEventRecorder {
return &nodeEventRecorder{
evc: make(chan recordedNodeEvent, buffer),
}
}

func (set *nodeEventRecorder) nodeAdded(b *bucket, n *node) {
select {
case set.evc <- recordedNodeEvent{n, true}:
default:
panic("no space in event buffer")
}
}

func (set *nodeEventRecorder) nodeRemoved(b *bucket, n *node) {
select {
case set.evc <- recordedNodeEvent{n, false}:
default:
panic("no space in event buffer")
}
}

func (set *nodeEventRecorder) waitNodePresent(id enode.ID, timeout time.Duration) bool {
return set.waitNodeEvent(id, timeout, true)
}

func (set *nodeEventRecorder) waitNodeAbsent(id enode.ID, timeout time.Duration) bool {
return set.waitNodeEvent(id, timeout, false)
}

func (set *nodeEventRecorder) waitNodeEvent(id enode.ID, timeout time.Duration, added bool) bool {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case ev := <-set.evc:
if ev.node.ID() == id && ev.added == added {
return true
}
case <-timer.C:
return false
}
}
}

0 comments on commit 9faae76

Please sign in to comment.