dockerfiles/anylink/server/handler/link_cstp.go

138 lines
2.6 KiB
Go
Raw Normal View History

2021-06-08 20:45:26 +08:00
package handler
import (
2021-08-26 23:09:52 +08:00
"bufio"
2021-06-08 20:45:26 +08:00
"encoding/binary"
"net"
"time"
"github.com/bjdgyc/anylink/base"
2021-08-26 23:09:52 +08:00
"github.com/bjdgyc/anylink/pkg/utils"
2021-06-08 20:45:26 +08:00
"github.com/bjdgyc/anylink/sessdata"
)
2021-08-26 23:09:52 +08:00
func LinkCstp(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
2021-06-08 20:45:26 +08:00
defer func() {
base.Debug("LinkCstp return", cSess.IpAddr)
_ = conn.Close()
cSess.Close()
}()
var (
err error
n int
dataLen uint16
dead = time.Duration(cSess.CstpDpd+5) * time.Second
)
2021-08-26 23:09:52 +08:00
go cstpWrite(conn, bufRW, cSess)
2021-06-08 20:45:26 +08:00
for {
// 设置超时限制
2021-08-26 23:09:52 +08:00
err = conn.SetReadDeadline(utils.NowSec().Add(dead))
2021-06-08 20:45:26 +08:00
if err != nil {
base.Error("SetDeadline: ", err)
return
}
// hdata := make([]byte, BufferSize)
2021-08-02 20:41:35 +08:00
pl := getPayload()
2021-08-26 23:09:52 +08:00
n, err = bufRW.Read(pl.Data)
2021-06-08 20:45:26 +08:00
if err != nil {
base.Error("read hdata: ", err)
return
}
// 限流设置
err = cSess.RateLimit(n, true)
if err != nil {
base.Error(err)
}
2021-08-02 20:41:35 +08:00
switch pl.Data[6] {
2021-06-08 20:45:26 +08:00
case 0x07: // KEEPALIVE
// do nothing
// base.Debug("recv keepalive", cSess.IpAddr)
case 0x05: // DISCONNECT
base.Debug("DISCONNECT", cSess.IpAddr)
return
case 0x03: // DPD-REQ
// base.Debug("recv DPD-REQ", cSess.IpAddr)
2021-08-02 20:41:35 +08:00
pl.PType = 0x04
if payloadOutCstp(cSess, pl) {
2021-06-08 20:45:26 +08:00
return
}
case 0x04:
// log.Println("recv DPD-RESP")
case 0x00: // DATA
2021-08-02 20:41:35 +08:00
// 获取数据长度
dataLen = binary.BigEndian.Uint16(pl.Data[4:6]) // 4,5
// 去除数据头
copy(pl.Data, pl.Data[8:8+dataLen])
// 更新切片长度
pl.Data = pl.Data[:dataLen]
// pl.Data = append(pl.Data[:0], pl.Data[8:8+dataLen]...)
if payloadIn(cSess, pl) {
2021-06-08 20:45:26 +08:00
return
}
}
}
}
2021-08-26 23:09:52 +08:00
func cstpWrite(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
2021-06-08 20:45:26 +08:00
defer func() {
base.Debug("cstpWrite return", cSess.IpAddr)
_ = conn.Close()
cSess.Close()
}()
var (
err error
n int
2021-08-02 20:41:35 +08:00
pl *sessdata.Payload
2021-06-08 20:45:26 +08:00
)
for {
select {
2021-08-02 20:41:35 +08:00
case pl = <-cSess.PayloadOutCstp:
2021-06-08 20:45:26 +08:00
case <-cSess.CloseChan:
return
}
2021-08-02 20:41:35 +08:00
if pl.LType != sessdata.LTypeIPData {
2021-06-08 20:45:26 +08:00
continue
}
2021-08-02 20:41:35 +08:00
if pl.PType == 0x00 {
// 获取数据长度
l := len(pl.Data)
// 先扩容 +8
pl.Data = pl.Data[:l+8]
// 数据后移
copy(pl.Data[8:], pl.Data)
// 添加头信息
copy(pl.Data[:8], plHeader)
// 更新头长度
binary.BigEndian.PutUint16(pl.Data[4:6], uint16(l))
} else {
pl.Data = append(pl.Data[:0], plHeader...)
// 设置头类型
pl.Data[6] = pl.PType
2021-06-08 20:45:26 +08:00
}
2021-08-02 20:41:35 +08:00
n, err = conn.Write(pl.Data)
2021-06-08 20:45:26 +08:00
if err != nil {
base.Error("write err", err)
return
}
2021-08-02 20:41:35 +08:00
putPayload(pl)
2021-06-08 20:45:26 +08:00
// 限流设置
err = cSess.RateLimit(n, false)
if err != nil {
base.Error(err)
}
}
}