docker pull bitnami/nats:2.10.11
docker run -ti -p 4222:4222 -p 8222:8222 --name nats bitnami/nats:2.10.11
依赖
go get github.com/nats-io/nats.go/@v1.33.1
推送消息
func Test_natsio_request(t *testing.T) {
url := nats.DefaultURL
nc, _ := nats.Connect(url)
defer nc.Drain()
// Matches all of the above
nc.Publish("foo", []byte("Hello World")) // Use the response
println("==================== request request request")
}
接收消息
func Test_natsio_resp(t *testing.T) {
url := nats.DefaultURL
nc, _ := nats.Connect(url)
defer nc.Drain()
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data))
})
println("==================== resp ")
}
相关代码配置如上所示,最终我失败了,没有打印 Msg received on 这个打印。
我查看了官方的相关文档,b 站的使用视频,我进行了一些尝试,但是仍然没有成功。
·············································
请问我如何才能正确的接收消息呢。。。
1
RedBeanIce OP |
2
kumoocat 273 天前 1
官方示例有说:
https://natsbyexample.com/examples/messaging/pub-sub/go There are two circumstances when a published message won’t be delivered to a subscriber: The subscriber does not have an active connection to the server ... func TestNats(t *testing.T) { url := nats.DefaultURL nc, _ := nats.Connect(url) defer nc.Drain() var wg sync.WaitGroup wg.Add(1) nc.Subscribe("foo", func(m *nats.Msg) { defer wg.Done() fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data)) }) nc.Publish("foo", []byte("Hello World")) wg.Wait() } |
3
XCFOX 273 天前
我平常主要写 node.js ,为了回复你这个问题特意装了 go 的环境。
你的问题是接受程序(Test_natsio_resp) 从上到下走完直接完事儿了,没有等待下一条消息这个程序就退出了。 你需要想办法让程序等待消息进来,具体代码请参考: https://github.com/nats-io/go-nats-examples/blob/main/api-examples/subscribe_async/main.go |
4
XCFOX 273 天前 1
除了添加 WaitGroup 用于等待消息,你还得在发布端使用 defer nc.Close() 而不是 defer nc.Drain(),Drain 状态下是不能发布消息的。
|
5
RedBeanIce OP @kumoocat
@XCFOX 感谢,确实是可以了。。。。 单个文件内解决的代码如 kumoocat 所示 多个文件如下,主题先要启动接收端,再启动发送端。(注意:先启动发送端再启动接收端,消息会丢失。) ```go 发送端 func Test_natsio_request(t *testing.T) { url := nats.DefaultURL nc, err := nats.Connect(url) if err != nil { log.Fatal(err) return } defer nc.Close() // Matches all of the above nc.Publish("foo", []byte("Hello World")) // Use the response println("==================== request request request") } ``` ```go 接收端 func Test_natsio_resp(t *testing.T) { url := nats.DefaultURL nc, err := nats.Connect(url) if err != nil { log.Fatal(err) return } defer nc.Drain() var wg sync.WaitGroup wg.Add(1) _, err2 := nc.Subscribe("foo", func(m *nats.Msg) { defer wg.Done() fmt.Printf(time.Now().String(), "Msg received on [%s] : %s\n", m.Subject, string(m.Data)) }) if err2 != nil { log.Fatal(err2) return } wg.Wait() println("==================== resp ") } ``` |
6
RedBeanIce OP |
7
liuhan907 272 天前 1
@RedBeanIce 因为 nats 的消息是发送时没有接收者的话就会被丢弃,不会保存。如果有需求应该用 nats jetstream 。
|
8
RedBeanIce OP |