Golang socket.io服务器由于某种原因出现故障

x33g5p2x  于 2023-08-01  发布在  Go
关注(0)|答案(2)|浏览(227)

在做了一些测试Golang and Android Kotlin code以尝试socket.io之后,我将这些代码复制到我的项目的服务器和客户端应用程序中。
我与原始代码的唯一不同之处在于,由于必要性,我将socket服务器启动为coroutine,因为仅仅调用StartSocket似乎本质上是一个阻塞函数。
更新后,我测试了代码是否仍然有效,它确实有效。应用可以连接到服务器,并且应用也可以向服务器发射,并且IIRC应用还可以从服务器接收发射。
当我重新构建应用程序时,服务器显示客户端断开连接。现在,只有连接部分工作。即使是原来的应用程序现在也不能发射,重建应用程序似乎也不会再断开它。客户端再次重复断开连接,但现在它是无声的,就像终端上只显示一条新的连接消息。在重复断开连接之前,至少告诉客户端断开连接的一些原因。
如果我go run原始代码并将其与原始应用程序配对,一切都正常。我go build我的项目代码,但我怀疑这应该会影响socket.io。当我几乎没有改变任何东西的时候,为什么一切(大部分)都不起作用了,我不知所措。
下面是我的Golang代码:

package helpers

import (
    "flag"
    "fmt"
    "log"
    "net/http"

    socketio "github.com/googollee/go-socket.io"
    "github.com/googollee/go-socket.io/engineio"
    "github.com/googollee/go-socket.io/engineio/transport"
    "github.com/googollee/go-socket.io/engineio/transport/polling"
    "github.com/googollee/go-socket.io/engineio/transport/websocket"
)

var allowOriginFunc = func(r *http.Request) bool {
    return true
}
var (
    port = flag.Int("socket_server_port", 8000, "Socket sckServer port")
)

var sckServer *socketio.Server

const WARNING_TAG = "warning"
const ALERT_TAG = "alert"
const NAMESPACE = "notifications"
const SIMPLE_TAG = "simple"
const ROOM = "notif_room"

func StartSocket() {
    flag.Parse()

    sckServer = socketio.NewServer(&engineio.Options{
        Transports: []transport.Transport{
            &polling.Transport{
                CheckOrigin: allowOriginFunc,
            },
            &websocket.Transport{
                CheckOrigin: allowOriginFunc,
            },
        },
    })

    sckServer.OnConnect("/", func(s socketio.Conn) error {
        s.SetContext("")
        fmt.Println("connected:", s.ID())

        s.Emit("notice", "new user connected")
        return nil
    })

    sckServer.OnEvent("/", "notice", func(s socketio.Conn, msg string) {
        fmt.Println("notice:", msg)
        s.Emit("notice", "have "+msg)
    })

    sckServer.OnError("/", func(s socketio.Conn, e error) {
        fmt.Println("socket error:", e)
    })

    sckServer.OnDisconnect("/", func(s socketio.Conn, reason string) {
        fmt.Println("closed", reason)
    })

    go sckServer.Serve()
    defer sckServer.Close()

    http.Handle("/socket.io/", sckServer)
    http.Handle("/", http.FileServer(http.Dir("./asset")))

    fmt.Printf("Socket sckServer serving at localhost:%d...\n", *port)

    err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)

    if err != nil {
        log.Fatalf("Failed to start socket sckServer: %v\n", err)
    }
}

func GetSocketSrv() *socketio.Server {
    return sckServer
}

func BroadcastToTag(tag string, payload string) {
    fmt.Printf("BroadcastToTag tag: %s, payload: %s\n", tag, payload)

    if sckServer != nil {
        broadcastStat := sckServer.BroadcastToNamespace(NAMESPACE, tag, payload)
        fmt.Printf("broadcastStat: %v\n", broadcastStat)
    } else {
        fmt.Printf("sckServer = nil\n")
    }
}

字符串
下面是我的AndroidKotlin代码:

import android.util.Log
import io.socket.client.IO
import io.socket.client.Socket
import io.socket.emitter.Emitter
import java.net.ConnectException
import java.net.URISyntaxException

class SocketHelper {
    private lateinit var mSocket: Socket
    private val onAlertNotif =
        Emitter.Listener { args ->
            Log.i(TAG, "onAlertNotif args: ${args[0]}")
        }

    private val onWarningNotif =
        Emitter.Listener { args ->
            Log.i(TAG, "onWarningNotif args: ${args[0]}")
        }

    private val onSimpleNotif =
        Emitter.Listener { args ->
            Log.i(TAG, "onSimpleNotif args: ${args[0]}")
        }

    init {
        try {
            mSocket = IO.socket("http://<local_ip>:8000/")
        }catch (e: ConnectException) {
            Log.e(TAG, "Socket ConnExc: ${e.localizedMessage}")
        }catch (e: URISyntaxException) {
            Log.e(TAG, "Socket URISynExc: ${e.localizedMessage}")
        }catch (e: Exception){
            Log.e(TAG, "Socket Exc: ${e.localizedMessage}")
        }
    }

    fun send(eventName: String, msg: String){
        mSocket.emit(eventName, msg)
    }

    fun open(){
        mSocket.on("alert", onAlertNotif)
        mSocket.on("warning", onWarningNotif)
        mSocket.on("simple", onSimpleNotif)

        mSocket.connect()
    }

    fun stop(){
        mSocket.off()
        mSocket.disconnect()
    }

    companion object{
        const val TAG = "SocketHelper"
    }
}

class MainActivity : AppCompatActivity() {
    private val socketHelper = SocketHelper()
    
    override fun onCreate(savedInstanceState: Bundle?) {
        ...
        socketHelper.open()
    }
    
     override fun onDestroy() {
        super.onDestroy()

        socketHelper.stop()
    }
}


更新:
作为更新,我还将从服务器端分享main.go,因为它可能对你们有帮助:

package main

import (
    "flag"
    "fmt"
    "log"
    "net"

    pb "github.com/<me>/<project_name>/api/proto/out"
    cmmHelpers "github.com/<me>/<project_name>/cmd/commons/helpers"
    "github.com/<me>/<project_name>/cmd/server/handlers"
    "github.com/<me>/<project_name>/cmd/server/helpers"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
)

func init() {
    cmmHelpers.DatabaseConnection()
}

var (
    tls      = flag.Bool("tls", true, "Connection uses TLS if true, else plain TCP")
    certFile = flag.String("cert_file", "", "The TLS cert file")
    keyFile  = flag.String("key_file", "", "The TLS key file")
    port     = flag.Int("port", 50051, "The server port")
)

func main() {
    flag.Parse()

    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    var opts []grpc.ServerOption
    if *tls {
        if *certFile == "" {
            *certFile = "service.pem"
        }

        if *keyFile == "" {
            *keyFile = "service.key"
        }

        creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)

        if err != nil {
            log.Fatalf("Failed to generate credentials: %v", err)
        }

        opts = []grpc.ServerOption{grpc.Creds(creds)}
    }

    mServ := grpc.NewServer(opts...)

    fmt.Println("gRPC server running ...")

    //some gRPC related boiler plate

    log.Printf("Server listening at %v", lis.Addr())

    go helpers.StartSocket()

    if err := mServ.Serve(lis); err != nil {
        log.Fatalf("failed to serve : %v", err)
    }
}

vc6uscn9

vc6uscn91#

我已经找到答案了,但它是毁灭性的。我完全抛弃了socket.io,使用了内置的net包。如果需要的话,我将在后面说明如何实现socket.io的那些奇特特性。代码如下所示:

import (
    "flag"
    "fmt"
    "log"
    "net"
    "os"
)

var (
    SOCKET_PORT   = flag.Int("socket_server_port", 9001, "Socket sckServer port")
    HOST          = flag.String("host name", "localhost", "server host name")
    socketConnArr = make(map[string]net.Conn)
)

func StartSocket() {
    listen, err := net.Listen(TYPE, fmt.Sprintf(":%d", *SOCKET_PORT))
    if err != nil {
        log.Fatalf("conn err: %s", err)
        os.Exit(1)
    }

    defer listen.Close()

    fmt.Printf("Socket sckServer serving at localhost:%d...\n", *SOCKET_PORT)

    for {
        conn, err := listen.Accept()
        if err != nil {
            log.Fatalf("accept err: %s", err)
            continue
        }
        socketConnArr[conn.RemoteAddr().String()] = conn

        fmt.Printf("socket client on: %s\n", conn.RemoteAddr().String())

        go connWrite(conn, "socket server ack")
        go connRead(conn)
    }
}

func connRead(conn net.Conn) {
    fmt.Println("Reading")
    buffer := make([]byte, 1024)
    bytesRead, err := conn.Read(buffer)
    if err != nil {
        log.Fatalf("req err: %s", err)
    }

    fmt.Printf("Read %d bytes: %s", bytesRead, string(buffer[:bytesRead]))
}

func connWrite(conn net.Conn, message string) {
    fmt.Println("Sending")
    conn.Write([]byte("\n")) //for some reason, this is very important
    conn.Write([]byte(message))
    conn.Write([]byte("\n"))
}

字符串
Android方面:

lifecycleScope.launch(Dispatchers.IO) {
    var response = ""

    try {
        socket = Socket(SERVER_IP, SERVER_PORT)
        writer = PrintWriter(socket.getOutputStream())
        reader = BufferedReader(InputStreamReader(socket.getInputStream()))

        writer.println(socketInitMsg)
        writer.flush()

        while (true) {
            if (reader.readLine() != null) {
                response = reader.readLine()
                Log.i(SocketHelper.TAG, "response: $response")
            }
        }
    } catch (e: IOException) {
        Log.e(SocketHelper.TAG, "err1: " + e.localizedMessage)
    } catch (e: Exception) {
        Log.e(SocketHelper.TAG, "err11: " + e.localizedMessage)
    }
}

sc4hvdpw

sc4hvdpw2#

首先,确保您没有并发问题:由于您将socket服务器作为Go例程运行,因此它可能会由于启动它的主函数的生命周期而过早终止。
确保main函数在服务器处理完所有连接之前不会终止。
在提供的main.go代码中,您在启动gRPC服务器之前调用了go helpers.StartSocket()。如果gRPC服务器关闭(由于错误或程序终止),StartSocket goroutine也可能被终止。
另见“Starting a socket server interferes with the gRPC/http client server communication Golang
和往常一样,检查你的错误!可能在服务器端或客户端发生错误,但未正确捕获该错误。

  • 在你的Kotlin代码中,你目前只捕获了ConnectExceptionURISyntaxException。最好还捕获一个泛型Exception作为回退,因为可能会发生其他未预料到的异常。
  • 在你的Go代码中,sckServer.Serve()可能会返回一个应该被处理或记录的错误。
  • BroadcastToTag函数可以检查广播是否成功,并处理或记录可能发生的任何错误。但是,BroadcastToNamespace函数在go-socket.io库中的当前实现中似乎没有返回错误。

或者,添加错误值:在GetSocketSrv函数中,当sckServernil时返回错误是一种更好的做法,因为它表明套接字服务器尚未初始化。这允许调用者适当地处理这种情况,而不是冒恐慌的风险。例如,如果BroadcastToTagStartSocket之前或在socket服务器关闭之后被调用,则sckServer.BroadcastToNamespace将导致死机,因为sckServernil
为了改进错误处理和更好地管理你的goroutine的生命周期,你的代码可以包含这些更新:

package helpers

    import (
        "flag"
        "fmt"
        "log"
        "net/http"
        "sync"

        socketio "github.com/googollee/go-socket.io"
        "github.com/googollee/go-socket.io/engineio"
        "github.com/googollee/go-socket.io/engineio/transport"
        "github.com/googollee/go-socket.io/engineio/transport/polling"
        "github.com/googollee/go-socket.io/engineio/transport/websocket"
    )

    var allowOriginFunc = func(r *http.Request) bool {
        return true
    }
    var (
        port = flag.Int("socket_server_port", 8000, "Socket sckServer port")
    )

    var sckServer *socketio.Server
    var wg sync.WaitGroup  // WaitGroup to manage goroutines' lifecycle

    // Constants, as before
    //...

    func StartSocket() {
        flag.Parse()

        sckServer = socketio.NewServer(&engineio.Options{
            Transports: []transport.Transport{
                &polling.Transport{
                    CheckOrigin: allowOriginFunc,
                },
                &websocket.Transport{
                    CheckOrigin: allowOriginFunc,
                },
            },
        })

        // Event handlers, as before
        //...

        go func() {
            defer wg.Done()  // When this goroutine finishes, mark one job as done
            sckServer.Serve()
        }()
        wg.Add(1)  // We have one job running

        defer func() {
            sckServer.Close()  // Make sure to close the server when the function exits
            wg.Wait()  // Wait for all jobs to finish
        }()

        // HTTP handlers, as before
        //...

        fmt.Printf("Socket sckServer serving at localhost:%d...\n", *port)

        err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)

        if err != nil {
            log.Fatalf("Failed to start socket sckServer: %v\n", err)
        }
    }

    func GetSocketSrv() (*socketio.Server, error) {
        if sckServer == nil {
            return nil, fmt.Errorf("Socket server is not initialized")
        }

        return sckServer, nil
    }

    // BroadcastToTag function, as before
    //...

字符串
对于main.go

package main

    import (
        // Imports, as before
        //...
    )

    // Initializations, as before
    //...

    func main() {
        flag.Parse()

        lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
        if err != nil {
            log.Fatalf("failed to listen: %v", err)
        }

        var opts []grpc.ServerOption
        if *tls {
            // Certificate and key file handling, as before
            //...
        }

        mServ := grpc.NewServer(opts...)

        fmt.Println("gRPC server running ...")

        // gRPC related boilerplate, as before
        //...

        log.Printf("Server listening at %v", lis.Addr())

        go func() {
            if err := helpers.StartSocket(); err != nil {
                log.Fatalf("Failed to start socket server: %v", err)
            }
        }()

        if err := mServ.Serve(lis); err != nil {
            log.Fatalf("failed to serve : %v", err)
        }
    }


请将“Imports,as before”和“gRPC related boilerplate,as before”注解替换为原始代码。
我已经用更简单的NET库替换了这些代码
一个简单的TCP服务器/客户端实现,服务器端使用Go's net package,客户端使用Kotlin's Ktor library(不完全复制所有socket.io功能,特别是与名称空间和广播相关的功能):

Go服务器:

package main

import (
    "bufio"
    "fmt"
    "log"
    "net"
    "strings"
)

func handleConnection(c net.Conn) {
    fmt.Printf("Serving %s\n", c.RemoteAddr().String())
    for {
        netData, err := bufio.NewReader(c).ReadString('\n')
        if err != nil {
            fmt.Println(err)
            return
        }

        temp := strings.TrimSpace(string(netData))
        if temp == "STOP" {
            break
        }

        fmt.Println(temp)
        c.Write([]byte("Received data: " + temp + "\n"))
    }
    c.Close()
}

func main() {
    l, err := net.Listen("tcp4", ":5000")
    if err != nil {
        log.Fatalf("Failed to start server: %v\n", err)
    }
    defer l.Close()

    for {
        c, err := l.Accept()
        if err != nil {
            fmt.Println(err)
            return
        }
        go handleConnection(c)
    }
}


Go服务器将侦听TCP端口5000上的传入连接。
当接收到一个连接时,它将创建一个新的goroutine来独立于其他连接处理该连接。
然后,它将逐行读取传入的数据,将其打印出来,并使用“Received data:”前缀将其回显到客户端。

Kotlin客户端:

import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.net.InetSocketAddress

fun main() = runBlocking {
    val socket = aSocket(Dispatchers.IO).tcp().connect(InetSocketAddress("localhost", 5000))

    val input = socket.openReadChannel()
    val output = socket.openWriteChannel(autoFlush = true)

    output.writeUTF8Line("Hello, Server!")
    val response = input.readUTF8Line()
    println("Server said: '$response'")

    output.writeUTF8Line("STOP")
    socket.close()
}


Kotlin客户端将连接到服务器,发送问候消息,等待响应,打印它,然后在关闭连接之前发送“STOP”消息。
此代码使用Kotlin协程和Ktor's aSocket method打开TCP连接。
注意:只有当服务器和客户端运行在同一台机器上时,此代码才有效。如果它们位于不同的计算机上,请将"localhost"替换为服务器的IP地址。
请记住在Kotlin依赖项中包含Ktor库(ktor-network):

dependencies {
    implementation 'io.ktor:ktor-network:2.3.2'
}

相关问题