笑死 最近写了个转发助手 和你这个题完全重叠
写出来加双端调试 部署上线测试总共两天吧
来看看我这水平能开多少(
```go
// listener 省略
func handleConnection(conn net.Conn) {
s := &Session{
Cmd: link_start,
Remote: remote,
ServerSideRemote: conn.RemoteAddr().String(),
ClientID: clientID,
}
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
sessionManager.Set(s.ServerSideRemote, conn)
sessionManager.Bind(s.ServerSideRemote, cancel)
send, _ := json.Marshal(s)
token := mqttClient.Publish(controlTopic, defaultQOS, false, send)
if token.Wait() && token.Error() != nil {
slog.Error("Publish control packet failed:", "err", token.Error())
sessionManager.Remove(s.ServerSideRemote)
return
}
<-ctx.Done()
switch ctx.Err() {
case context.Canceled:
slog.Info("Link confirmed successfully:", "serversideremote", s.ServerSideRemote)
case context.DeadlineExceeded:
slog.Warn("Confirmation timeout after 5s:", "serversideremote", s.ServerSideRemote)
sessionManager.Remove(s.ServerSideRemote)
}
}
func runServerForwarding(s *Session) {
conn, ok := sessionManager.Get(s.ServerSideRemote)
if !ok {
slog.Error("Connection not found:", "connID", s.ServerSideRemote)
return
}
defer conn.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
topic := topicPrefix + s.ClientSideLocal
token := mqttClient.Subscribe(topic, defaultQOS, func(c mqtt.Client, m mqtt.Message) {
select {
case <-ctx.Done():
return
default:
slog.Debug("Recv client down:", "recv", m.Payload())
recv, err := hex.DecodeString(string(m.Payload()))
if err != nil {
slog.Error("HEX decode error:", "err", err)
cancel()
return
}
_, err = conn.Write(recv)
if err != nil {
slog.Warn("TCP write error:", "err", err)
cancel()
}
}
})
if token.Wait() && token.Error() != nil {
slog.Error("Subscribe client down error:", "err", token.Error())
return
}
buf := make([]byte, defaultBufSize)
for {
select {
case <-ctx.Done():
return
default:
conn.SetReadDeadline(time.Now().Add(readTimeout))
n, err := conn.Read(buf)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
slog.Debug("TCP read timeout:", "connID", s.ServerSideRemote)
continue
}
slog.Warn("TCP read error:", "err", err)
return
}
send := hex.EncodeToString(buf[:n])
slog.Debug("Send up:", "send", send)
topic := topicPrefix + s.ServerSideRemote
token := mqttClient.Publish(topic, defaultQOS, false, send)
if token.Wait() && token.Error() != nil {
slog.Error("Publish up error:", "err", token.Error())
return
}
}
}
}
```