我正在尝试使用Golang实现一个以太网交换机作为我的UNI项目的一部分。但是现在,当我运行测试用例时,它得到了一个竞争条件,我无法修复这个问题。
下面是实际的问题陈述和完整的细节。我将在这里分享实现的代码细节。
详细信息:Github Gist
到目前为止我所做的:
package eth
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"log"
"time"
)
type MACAddress [6]byte
func (addr MACAddress) String() string {
return net.HardwareAddr(addr[:]).String()
}
// BroadcastAddress is the MAC address to use when the desire it to broadcast to all nodes on the LAN.
var BroadcastAddress = MACAddress{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
// Frame is an ethernet frame
// see https://en.wikipedia.org/wiki/Ethernet_frame
type Frame struct {
Source MACAddress
Destination MACAddress
Data []byte
}
// String implements fmt.Stringer
func (f *Frame) String() string {
if len(f.Data) <= 2 {
return fmt.Sprintf("%s->%s data %v", f.Source, f.Destination, f.Data)
}
// large packet
return fmt.Sprintf("%s->%s length %d", f.Source, f.Destination, len(f.Data))
}
// Port represents a physical ethernet port on a switch
type Port interface {
io.ReadWriteCloser
}
// EthernetSwitch implements the ethernet switch functionality
type EthernetSwitch struct {
sendQueueSize int
ports []Port
macTable map[MACAddress]Port
tableMutex chan struct{}
wg chan struct{}
}
// NewEthernetSwitch create a new switch with sendQueueSize for the size of the sending buffers and the provided ports.
func NewEthernetSwitch(sendQueueSize int, ports ...Port) *EthernetSwitch {
return &EthernetSwitch{
sendQueueSize: sendQueueSize,
ports: ports,
macTable: make(map[MACAddress]Port),
tableMutex: make(chan struct{}, 1),
wg: make(chan struct{}, len(ports)),
}
}
// Run the ethernet switch.
// Blocks until all the io.Readers of the ports are closed (return io.EOF).
// Returns any unrecoverable error from reading (other than io.EOF) or writing to the ports.
// Before returning, this closes the writer for each port.
func (sw *EthernetSwitch) Run() error {
// Start a goroutine for each port to handle reading frames and forwarding them
sw.wg = make(chan struct{}, len(sw.ports))
for _, port := range sw.ports {
go func(port Port) {
defer port.Close() // Close the writer when the goroutine exits
for {
frame, err := ReadFrame(port)
if err != nil {
if err == io.EOF {
sw.wg <- struct{}{}
return
}
continue
}
sw.forwardFrame(port, frame)
}
}(port)
}
// Wait for all the goroutines to finish
for range sw.ports {
<-sw.wg
}
close(sw.wg) // Close the channel after all goroutines have finished
return nil
}
func (sw *EthernetSwitch) forwardFrame(from Port, frame *Frame) {
sw.tableMutex <- struct{}{}
sw.macTable[frame.Source] = from
port, ok := sw.macTable[frame.Destination]
<-sw.tableMutex
timeout := time.After(10 * time.Millisecond)
if !ok {
// Broadcast if the destination MAC address is not in the table
for _, p := range sw.ports {
if p != from {
done := make(chan error, 1)
go func() {
_, err := WriteFrame(p, *frame)
done <- err
}()
select {
case err := <-done:
if err != nil {
log.Fatal(err)
}
case <-timeout:
log.Println("WriteFrame timed out")
}
}
}
} else {
done := make(chan error, 1)
go func() {
_, err := WriteFrame(port, *frame)
done <- err
}()
select {
case err := <-done:
if err != nil {
log.Fatal(err)
}
case <-timeout:
log.Println("WriteFrame timed out")
}
}
}
func (sw *EthernetSwitch) RunSize() int {
sw.tableMutex <- struct{}{}
defer func() { <-sw.tableMutex }()
size := len(sw.macTable)
return size
}
// ReadFrame reads a single frame from r.
// If the frame is not valid, return a nil Frame and a nil error.
func ReadFrame(r io.Reader) (*Frame, error) {
var header [14]byte
if _, err := io.ReadFull(r, header[:]); err != nil {
return nil, err
}
lengthOrType := binary.BigEndian.Uint16(header[12:14])
var data []byte
if lengthOrType <= 1500 {
data = make([]byte, lengthOrType)
if _, err := io.ReadFull(r, data); err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("invalid Ethernet II frame: lengthOrType field is greater than 1500")
}
// Read the CRC32 checksum.
var receivedChecksum uint32
err := binary.Read(r, binary.BigEndian, &receivedChecksum) // Use BigEndian for consistency
if err != nil {
return nil, err
}
// Calculate the CRC32 checksum of the received frame (header and data).
buf := &bytes.Buffer{}
buf.Write(header[:])
buf.Write(data)
calculatedChecksum := crc32.ChecksumIEEE(buf.Bytes())
// Compare the received checksum with the calculated checksum.
if receivedChecksum != calculatedChecksum {
return nil, nil // Return nil Frame and nil error if the checksums don't match
}
return &Frame{
Destination: MACAddress{header[0], header[1], header[2], header[3], header[4], header[5]},
Source: MACAddress{header[6], header[7], header[8], header[9], header[10], header[11]},
Data: data,
}, nil
}
func WriteFrame(w io.Writer, frame Frame) (int, error) {
buf := &bytes.Buffer{}
if len(frame.Data) > 1500 {
return 0, fmt.Errorf("invalid Ethernet II frame: data length is greater than 1500 bytes")
}
header := make([]byte, 14)
copy(header[0:6], frame.Destination[:])
copy(header[6:12], frame.Source[:])
binary.BigEndian.PutUint16(header[12:14], uint16(len(frame.Data)))
// Write the header and frame data to the buffer
n, err := buf.Write(header)
if err != nil {
return n, err
}
m, err := buf.Write(frame.Data)
if err != nil {
return n + m, err
}
// Calculate the CRC32 checksum of the entire frame.
checksum := crc32.ChecksumIEEE(buf.Bytes())
// Write the frame (header and data) to the actual writer.
_, err = buf.WriteTo(w)
if err != nil {
return 0, err
}
// Write the checksum to the writer.
err = binary.Write(w, binary.BigEndian, checksum) // Use BigEndian for consistency
if err != nil {
return 0, err
}
return n + m + 4, nil // Return the correct length here
}
错误栈跟踪:
=== RUN TestSwitch_Simple
switch_test.go:219: Starting with 2 goroutines
=== RUN TestSwitch_Simple/discovery
=== RUN TestSwitch_Simple/discovery/unicast
=== RUN TestSwitch_Simple/discovery/unicast/regular
switch_test.go:254: Inside with 9 goroutines
=== RUN TestSwitch_Simple/discovery/unicast/drop_packets
2023/04/18 11:05:50 WriteFrame timed out
2023/04/18 11:05:50 WriteFrame timed out
2023/04/18 11:05:50 WriteFrame timed out
switch_test.go:279: Frame was not expected to be nil
=== RUN TestSwitch_Simple/broadcast_frame
2023/04/18 11:05:50 WriteFrame timed out
2023/04/18 11:05:50 WriteFrame timed out
panic: test timed out after 2s
running tests:
TestSwitch_Simple (2s)
TestSwitch_Simple/broadcast_frame (2s)
goroutine 56 [running]:
testing.(*M).startAlarm.func1()
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:2241 +0x1b0
created by time.goFunc
/opt/homebrew/Cellar/go/1.20.3/libexec/src/time/sleep.go:176 +0x48
goroutine 1 [chan receive]:
testing.(*T).Run(0xc000082b60, {0x100dbca2f, 0x11}, 0x100e36da0)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1630 +0x604
testing.runTests.func1(0x0?)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:2036 +0x84
testing.tRunner(0xc000082b60, 0xc000095b38)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1576 +0x18c
testing.runTests(0xc0000b8320?, {0x100f20fe0, 0x4, 0x4}, {0xc000095c08?, 0x100c790ec?, 0x100f25a40?})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:2034 +0x704
testing.(*M).Run(0xc0000b8320)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1906 +0x954
main.main()
_testmain.go:55 +0x304
goroutine 28 [chan receive]:
testing.(*T).Run(0xc0000f4340, {0x100dbc34d, 0xf}, 0xc000204210)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1630 +0x604
ethswitch/pkg/eth_test.TestSwitch_Simple(0xc0000f4340)
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:286 +0xa24
testing.tRunner(0xc0000f4340, 0x100e36da0)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1576 +0x18c
created by testing.(*T).Run
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1629 +0x5e8
goroutine 29 [chan receive]:
ethswitch/pkg/eth.(*EthernetSwitch).Run(0xc0000a4980)
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:93 +0x248
ethswitch/pkg/eth_test.TestSwitch_Simple.func1()
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:235 +0x38
created by ethswitch/pkg/eth_test.TestSwitch_Simple
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:234 +0x880
goroutine 68 [select]:
io.(*pipe).read(0xc0000a25a0, {0xc00018e02b, 0x4da, 0x580000101010000?})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:57 +0xdc
io.(*PipeReader).Read(0xc0000a00e8, {0xc00018e02b, 0x4da, 0x4da})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:136 +0x54
io.ReadAtLeast({0x100e37bb8, 0xc0000a00e8}, {0xc00018e000, 0x505, 0x505}, 0x505)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:332 +0xcc
io.ReadFull(...)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:351
ethswitch/pkg/eth.ReadFrame({0x100e37bb8, 0xc0000a00e8})
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:165 +0xec
ethswitch/pkg/eth_test.expectFrame(0xc0002104e0, {0x100e37bb8, 0xc0000a00e8}, 0xc000204270)
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:172 +0x48
ethswitch/pkg/eth_test.TestSwitch_Simple.func3(0x0?)
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:289 +0x1f0
testing.tRunner(0xc0002104e0, 0xc000204210)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1576 +0x18c
created by testing.(*T).Run
/opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1629 +0x5e8
goroutine 31 [select]:
io.(*pipe).read(0xc0000a2480, {0xc0001084c2, 0xe, 0x100db28f0?})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:57 +0xdc
io.(*PipeReader).Read(0xc0000a00b8, {0xc0001084c2, 0xe, 0xe})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:136 +0x54
io.ReadAtLeast({0x12a271918, 0xc0000a48c0}, {0xc0001084c2, 0xe, 0xe}, 0xe)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:332 +0xcc
io.ReadFull(...)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:351
ethswitch/pkg/eth.ReadFrame({0x12a271918, 0xc0000a48c0})
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:156 +0x68
ethswitch/pkg/eth.(*EthernetSwitch).Run.func1({0x100e38270?, 0xc0000a48c0})
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:77 +0xa8
created by ethswitch/pkg/eth.(*EthernetSwitch).Run
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:74 +0xbc
goroutine 32 [select]:
io.(*pipe).read(0xc0000a2540, {0xc000108242, 0xe, 0x100db28f0?})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:57 +0xdc
io.(*PipeReader).Read(0xc0000a00d8, {0xc000108242, 0xe, 0xe})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:136 +0x54
io.ReadAtLeast({0x12a271918, 0xc0000a4900}, {0xc000108242, 0xe, 0xe}, 0xe)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:332 +0xcc
io.ReadFull(...)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:351
ethswitch/pkg/eth.ReadFrame({0x12a271918, 0xc0000a4900})
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:156 +0x68
ethswitch/pkg/eth.(*EthernetSwitch).Run.func1({0x100e38270?, 0xc0000a4900})
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:77 +0xa8
created by ethswitch/pkg/eth.(*EthernetSwitch).Run
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:74 +0xbc
goroutine 33 [select]:
io.(*pipe).read(0xc0000a2600, {0xc0000a6712, 0xe, 0x0?})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:57 +0xdc
io.(*PipeReader).Read(0xc0000a00f8, {0xc0000a6712, 0xe, 0xe})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:136 +0x54
io.ReadAtLeast({0x12a271918, 0xc0000a4940}, {0xc0000a6712, 0xe, 0xe}, 0xe)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:332 +0xcc
io.ReadFull(...)
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:351
ethswitch/pkg/eth.ReadFrame({0x12a271918, 0xc0000a4940})
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:156 +0x68
ethswitch/pkg/eth.(*EthernetSwitch).Run.func1({0x100e38270?, 0xc0000a4940})
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:77 +0xa8
created by ethswitch/pkg/eth.(*EthernetSwitch).Run
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:74 +0xbc
goroutine 55 [select]:
io.(*pipe).write(0xc0000a2660, {0xc000110500, 0xf, 0x40})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:86 +0x1f8
io.(*PipeWriter).Write(0xc0000a0110, {0xc000110500, 0xf, 0x40})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:165 +0x54
bytes.(*Buffer).WriteTo(0xc000107ef0, {0x12a303fe8, 0xc0000a4940})
/opt/homebrew/Cellar/go/1.20.3/libexec/src/bytes/buffer.go:252 +0xe4
ethswitch/pkg/eth.WriteFrame({0x12a303fe8, 0xc0000a4940}, {{0x5, 0x5, 0x5, 0x5, 0x5, 0x0}, {0xff, 0xff, ...}, ...})
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:220 +0x148
ethswitch/pkg/eth.(*EthernetSwitch).forwardFrame.func1()
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:114 +0xc0
created by ethswitch/pkg/eth.(*EthernetSwitch).forwardFrame
/Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:113 +0x54c
FAIL ethswitch/pkg/eth 2.515s
FAIL
1条答案
按热度按时间b09cbbtk1#
以下情况可能导致此问题:
在Run方法中的go-routine中,如果在
ReadFrame
期间收到错误,但err永远不等于io.EOF
,那么go-routine永远不会关闭自己,并且你的主例程也会停留在:如果它一直等到测试用例超时,那么测试用例将抛出一个panic。