我有一个go api项目,我也运行了一个worker(RabbitMQ),我刚刚发现了一个问题,我的worker和我的http listen和serve不能一起工作,我运行worker的那一刻,api的端口就达不到了。
下面是我的代码。
app.go
func (a *App) StartWorker() {
connection, err := amqp091.Dial(os.Getenv("AMQP_URL"))
if err != nil {
panic(err)
}
defer connection.Close()
consumer, err := events.NewConsumer(connection, database.GetDatabase(a.Database))
if err != nil {
panic(err)
}
consumer.Listen(os.Args[1:])
}
func (a *App) Run(addr string) {
logs := log.New(os.Stdout, "my-service", log.LstdFlags)
server := &http.Server{
Addr: addr,
Handler: a.Router,
ErrorLog: logs,
IdleTimeout: 120 * time.Second, // max time for connections using TCP Keep-Alive
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
}
go func() {
if err := server.ListenAndServe(); err != nil {
logs.Fatal(err)
}
}()
// trap sigterm or interrupt and gracefully shutdown the server
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
signal.Notify(c, os.Kill)
sig := <-c
logs.Println("Recieved terminate, graceful shutdown", sig)
tc, _ := context.WithTimeout(context.Background(), 30*time.Second)
server.Shutdown(tc)
}
这是我
consumer.go
// NewConsumer returns a new Consumer
func NewConsumer(conn *amqp.Connection, db *mongo.Database) (Consumer, error) {
consumer := Consumer{
conn: conn,
db: db,
}
err := consumer.setup()
if err != nil {
return Consumer{}, err
}
return consumer, nil
}
// Listen will listen for all new Queue publications
// and print them to the console.
func (consumer *Consumer) Listen(topics []string) error {
ch, err := consumer.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
if err != nil {
return err
}
msgs, err := ch.Consume("update.package.rating", "", true, false, false, false, nil)
if err != nil {
return err
}
forever := make(chan bool)
go func() {
for msg := range msgs {
switch msg.RoutingKey {
case "update.package.rating":
worker.RatePackage(packageRepo.NewPackagesRepository(consumer.db), msg.Body)
}
// acknowledege received event
log.Printf("Received a message: %s", msg.Body)
}
}()
log.Printf("[*] Waiting for message [Exchange, Queue][%s, %s]. To exit press CTRL+C", getExchangeName(), "update.package.rating")
<-forever
return nil
}
main.go
func main() {
start := app.App{}
start.StartApp()
start.StartWorker()
start.Run(":3006")
}
则不能到达端口3006。
我正在使用gin-gonic服务我的http请求。
欢迎任何帮助。
1条答案
按热度按时间nwsw7zdq1#
我在使用gin framework时遇到了类似的问题。通过在go例程中运行我的消费者解决了这个问题。我像下面这样调用我的消费者。
而且服务器和RabbitMQ消费者都无缝运行。仍然在监控性能,看看它是否足够健壮和弹性。