ETH交换机中的Go Race条件

lnvxswe2  于 2023-04-27  发布在  Go
关注(0)|答案(1)|浏览(145)

我正在尝试使用Golang实现一个以太网交换机作为我的UNI项目的一部分。但是现在,当我运行测试用例时,它得到了一个竞争条件,我无法修复这个问题。
下面是实际的问题陈述和完整的细节。我将在这里分享实现的代码细节。
详细信息:Github Gist

到目前为止我所做的:

package eth

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "hash/crc32"
    "io"
    "log"
    "time"
)

type MACAddress [6]byte

func (addr MACAddress) String() string {
    return net.HardwareAddr(addr[:]).String()
}

// BroadcastAddress is the MAC address to use when the desire it to broadcast to all nodes on the LAN.
var BroadcastAddress = MACAddress{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}

// Frame is an ethernet frame
// see https://en.wikipedia.org/wiki/Ethernet_frame
type Frame struct {
    Source      MACAddress
    Destination MACAddress
    Data        []byte
}

// String implements fmt.Stringer
func (f *Frame) String() string {
    if len(f.Data) <= 2 {
        return fmt.Sprintf("%s->%s data %v", f.Source, f.Destination, f.Data)
    }

    // large packet
    return fmt.Sprintf("%s->%s length %d", f.Source, f.Destination, len(f.Data))
}

// Port represents a physical ethernet port on a switch
type Port interface {
    io.ReadWriteCloser
}

// EthernetSwitch implements the ethernet switch functionality
type EthernetSwitch struct {
    sendQueueSize int
    ports         []Port
    macTable      map[MACAddress]Port
    tableMutex    chan struct{}
    wg            chan struct{}
}

// NewEthernetSwitch create a new switch with sendQueueSize for the size of the sending buffers and the provided ports.
func NewEthernetSwitch(sendQueueSize int, ports ...Port) *EthernetSwitch {
    return &EthernetSwitch{
        sendQueueSize: sendQueueSize,
        ports:         ports,
        macTable:      make(map[MACAddress]Port),
        tableMutex:    make(chan struct{}, 1),
        wg:            make(chan struct{}, len(ports)),
    }
}

// Run the ethernet switch.
// Blocks until all the io.Readers of the ports are closed (return io.EOF).
// Returns any unrecoverable error from reading (other than io.EOF) or writing to the ports.
// Before returning, this closes the writer for each port.
func (sw *EthernetSwitch) Run() error {
    // Start a goroutine for each port to handle reading frames and forwarding them
    sw.wg = make(chan struct{}, len(sw.ports))

    for _, port := range sw.ports {
        go func(port Port) {
            defer port.Close() // Close the writer when the goroutine exits
            for {
                frame, err := ReadFrame(port)
                if err != nil {
                    if err == io.EOF {
                        sw.wg <- struct{}{}
                        return
                    }
                    continue
                }

                sw.forwardFrame(port, frame)
            }
        }(port)
    }

    // Wait for all the goroutines to finish
    for range sw.ports {
        <-sw.wg
    }
    close(sw.wg) // Close the channel after all goroutines have finished

    return nil
}

func (sw *EthernetSwitch) forwardFrame(from Port, frame *Frame) {
    sw.tableMutex <- struct{}{}
    sw.macTable[frame.Source] = from
    port, ok := sw.macTable[frame.Destination]
    <-sw.tableMutex

    timeout := time.After(10 * time.Millisecond)

    if !ok {
        // Broadcast if the destination MAC address is not in the table
        for _, p := range sw.ports {
            if p != from {
                done := make(chan error, 1)
                go func() {
                    _, err := WriteFrame(p, *frame)
                    done <- err
                }()
                select {
                case err := <-done:
                    if err != nil {
                        log.Fatal(err)
                    }
                case <-timeout:
                    log.Println("WriteFrame timed out")
                }
            }
        }
    } else {
        done := make(chan error, 1)
        go func() {
            _, err := WriteFrame(port, *frame)
            done <- err
        }()
        select {
        case err := <-done:
            if err != nil {
                log.Fatal(err)
            }
        case <-timeout:
            log.Println("WriteFrame timed out")
        }
    }

}

func (sw *EthernetSwitch) RunSize() int {
    sw.tableMutex <- struct{}{}
    defer func() { <-sw.tableMutex }()
    size := len(sw.macTable)
    return size
}

// ReadFrame reads a single frame from r.
// If the frame is not valid, return a nil Frame and a nil error.
func ReadFrame(r io.Reader) (*Frame, error) {
    var header [14]byte
    if _, err := io.ReadFull(r, header[:]); err != nil {
        return nil, err
    }

    lengthOrType := binary.BigEndian.Uint16(header[12:14])

    var data []byte
    if lengthOrType <= 1500 {
        data = make([]byte, lengthOrType)
        if _, err := io.ReadFull(r, data); err != nil {
            return nil, err
        }
    } else {
        return nil, fmt.Errorf("invalid Ethernet II frame: lengthOrType field is greater than 1500")
    }

    // Read the CRC32 checksum.
    var receivedChecksum uint32
    err := binary.Read(r, binary.BigEndian, &receivedChecksum) // Use BigEndian for consistency
    if err != nil {
        return nil, err
    }

    // Calculate the CRC32 checksum of the received frame (header and data).
    buf := &bytes.Buffer{}
    buf.Write(header[:])
    buf.Write(data)
    calculatedChecksum := crc32.ChecksumIEEE(buf.Bytes())

    // Compare the received checksum with the calculated checksum.
    if receivedChecksum != calculatedChecksum {
        return nil, nil // Return nil Frame and nil error if the checksums don't match
    }

    return &Frame{
        Destination: MACAddress{header[0], header[1], header[2], header[3], header[4], header[5]},
        Source:      MACAddress{header[6], header[7], header[8], header[9], header[10], header[11]},
        Data:        data,
    }, nil
}

func WriteFrame(w io.Writer, frame Frame) (int, error) {
    buf := &bytes.Buffer{}
    if len(frame.Data) > 1500 {
        return 0, fmt.Errorf("invalid Ethernet II frame: data length is greater than 1500 bytes")
    }

    header := make([]byte, 14)
    copy(header[0:6], frame.Destination[:])
    copy(header[6:12], frame.Source[:])
    binary.BigEndian.PutUint16(header[12:14], uint16(len(frame.Data)))

    // Write the header and frame data to the buffer
    n, err := buf.Write(header)
    if err != nil {
        return n, err
    }
    m, err := buf.Write(frame.Data)
    if err != nil {
        return n + m, err
    }
    // Calculate the CRC32 checksum of the entire frame.
    checksum := crc32.ChecksumIEEE(buf.Bytes())
    // Write the frame (header and data) to the actual writer.
    _, err = buf.WriteTo(w)
    if err != nil {
        return 0, err
    }

    // Write the checksum to the writer.
    err = binary.Write(w, binary.BigEndian, checksum) // Use BigEndian for consistency
    if err != nil {
        return 0, err
    }
    return n + m + 4, nil // Return the correct length here
}

错误栈跟踪:

=== RUN   TestSwitch_Simple
    switch_test.go:219: Starting with 2 goroutines
=== RUN   TestSwitch_Simple/discovery
=== RUN   TestSwitch_Simple/discovery/unicast
=== RUN   TestSwitch_Simple/discovery/unicast/regular
    switch_test.go:254: Inside with 9 goroutines
=== RUN   TestSwitch_Simple/discovery/unicast/drop_packets
2023/04/18 11:05:50 WriteFrame timed out
2023/04/18 11:05:50 WriteFrame timed out
2023/04/18 11:05:50 WriteFrame timed out
    switch_test.go:279: Frame was not expected to be nil
=== RUN   TestSwitch_Simple/broadcast_frame
2023/04/18 11:05:50 WriteFrame timed out
2023/04/18 11:05:50 WriteFrame timed out
panic: test timed out after 2s
running tests:
        TestSwitch_Simple (2s)
        TestSwitch_Simple/broadcast_frame (2s)

goroutine 56 [running]:
testing.(*M).startAlarm.func1()
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:2241 +0x1b0
created by time.goFunc
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/time/sleep.go:176 +0x48

goroutine 1 [chan receive]:
testing.(*T).Run(0xc000082b60, {0x100dbca2f, 0x11}, 0x100e36da0)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1630 +0x604
testing.runTests.func1(0x0?)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:2036 +0x84
testing.tRunner(0xc000082b60, 0xc000095b38)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1576 +0x18c
testing.runTests(0xc0000b8320?, {0x100f20fe0, 0x4, 0x4}, {0xc000095c08?, 0x100c790ec?, 0x100f25a40?})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:2034 +0x704
testing.(*M).Run(0xc0000b8320)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1906 +0x954
main.main()
        _testmain.go:55 +0x304

goroutine 28 [chan receive]:
testing.(*T).Run(0xc0000f4340, {0x100dbc34d, 0xf}, 0xc000204210)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1630 +0x604
ethswitch/pkg/eth_test.TestSwitch_Simple(0xc0000f4340)
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:286 +0xa24
testing.tRunner(0xc0000f4340, 0x100e36da0)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1576 +0x18c
created by testing.(*T).Run
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1629 +0x5e8

goroutine 29 [chan receive]:
ethswitch/pkg/eth.(*EthernetSwitch).Run(0xc0000a4980)
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:93 +0x248
ethswitch/pkg/eth_test.TestSwitch_Simple.func1()
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:235 +0x38
created by ethswitch/pkg/eth_test.TestSwitch_Simple
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:234 +0x880

goroutine 68 [select]:
io.(*pipe).read(0xc0000a25a0, {0xc00018e02b, 0x4da, 0x580000101010000?})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:57 +0xdc
io.(*PipeReader).Read(0xc0000a00e8, {0xc00018e02b, 0x4da, 0x4da})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:136 +0x54
io.ReadAtLeast({0x100e37bb8, 0xc0000a00e8}, {0xc00018e000, 0x505, 0x505}, 0x505)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:332 +0xcc
io.ReadFull(...)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:351
ethswitch/pkg/eth.ReadFrame({0x100e37bb8, 0xc0000a00e8})
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:165 +0xec
ethswitch/pkg/eth_test.expectFrame(0xc0002104e0, {0x100e37bb8, 0xc0000a00e8}, 0xc000204270)
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:172 +0x48
ethswitch/pkg/eth_test.TestSwitch_Simple.func3(0x0?)
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch_test.go:289 +0x1f0
testing.tRunner(0xc0002104e0, 0xc000204210)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1576 +0x18c
created by testing.(*T).Run
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/testing/testing.go:1629 +0x5e8

goroutine 31 [select]:
io.(*pipe).read(0xc0000a2480, {0xc0001084c2, 0xe, 0x100db28f0?})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:57 +0xdc
io.(*PipeReader).Read(0xc0000a00b8, {0xc0001084c2, 0xe, 0xe})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:136 +0x54
io.ReadAtLeast({0x12a271918, 0xc0000a48c0}, {0xc0001084c2, 0xe, 0xe}, 0xe)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:332 +0xcc
io.ReadFull(...)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:351
ethswitch/pkg/eth.ReadFrame({0x12a271918, 0xc0000a48c0})
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:156 +0x68
ethswitch/pkg/eth.(*EthernetSwitch).Run.func1({0x100e38270?, 0xc0000a48c0})
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:77 +0xa8
created by ethswitch/pkg/eth.(*EthernetSwitch).Run
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:74 +0xbc

goroutine 32 [select]:
io.(*pipe).read(0xc0000a2540, {0xc000108242, 0xe, 0x100db28f0?})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:57 +0xdc
io.(*PipeReader).Read(0xc0000a00d8, {0xc000108242, 0xe, 0xe})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:136 +0x54
io.ReadAtLeast({0x12a271918, 0xc0000a4900}, {0xc000108242, 0xe, 0xe}, 0xe)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:332 +0xcc
io.ReadFull(...)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:351
ethswitch/pkg/eth.ReadFrame({0x12a271918, 0xc0000a4900})
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:156 +0x68
ethswitch/pkg/eth.(*EthernetSwitch).Run.func1({0x100e38270?, 0xc0000a4900})
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:77 +0xa8
created by ethswitch/pkg/eth.(*EthernetSwitch).Run
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:74 +0xbc

goroutine 33 [select]:
io.(*pipe).read(0xc0000a2600, {0xc0000a6712, 0xe, 0x0?})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:57 +0xdc
io.(*PipeReader).Read(0xc0000a00f8, {0xc0000a6712, 0xe, 0xe})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:136 +0x54
io.ReadAtLeast({0x12a271918, 0xc0000a4940}, {0xc0000a6712, 0xe, 0xe}, 0xe)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:332 +0xcc
io.ReadFull(...)
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/io.go:351
ethswitch/pkg/eth.ReadFrame({0x12a271918, 0xc0000a4940})
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:156 +0x68
ethswitch/pkg/eth.(*EthernetSwitch).Run.func1({0x100e38270?, 0xc0000a4940})
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:77 +0xa8
created by ethswitch/pkg/eth.(*EthernetSwitch).Run
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:74 +0xbc

goroutine 55 [select]:
io.(*pipe).write(0xc0000a2660, {0xc000110500, 0xf, 0x40})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:86 +0x1f8
io.(*PipeWriter).Write(0xc0000a0110, {0xc000110500, 0xf, 0x40})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/io/pipe.go:165 +0x54
bytes.(*Buffer).WriteTo(0xc000107ef0, {0x12a303fe8, 0xc0000a4940})
        /opt/homebrew/Cellar/go/1.20.3/libexec/src/bytes/buffer.go:252 +0xe4
ethswitch/pkg/eth.WriteFrame({0x12a303fe8, 0xc0000a4940}, {{0x5, 0x5, 0x5, 0x5, 0x5, 0x0}, {0xff, 0xff, ...}, ...})
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:220 +0x148
ethswitch/pkg/eth.(*EthernetSwitch).forwardFrame.func1()
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:114 +0xc0
created by ethswitch/pkg/eth.(*EthernetSwitch).forwardFrame
        /Volumes/Work/Fiverr/ethswitch-sainiketh07/pkg/eth/switch.go:113 +0x54c
FAIL    ethswitch/pkg/eth       2.515s
FAIL
b09cbbtk

b09cbbtk1#

以下情况可能导致此问题:
在Run方法中的go-routine中,如果在ReadFrame期间收到错误,但err永远不等于io.EOF,那么go-routine永远不会关闭自己,并且你的主例程也会停留在:

for range sw.ports {
    <-sw.wg //waiting until the channel has len(sw.ports) number of outputs
}

如果它一直等到测试用例超时,那么测试用例将抛出一个panic。

相关问题