dockerfiles/anylink/dtls-2.0.9/internal/net/dpipe/dpipe.go

145 lines
2.8 KiB
Go
Raw Normal View History

2021-06-08 20:45:26 +08:00
// Package dpipe provides the pipe works like datagram protocol on memory.
package dpipe
import (
"context"
"io"
"net"
"sync"
"time"
"github.com/pion/transport/deadline"
)
// Pipe creates pair of non-stream conn on memory.
// Close of the one end doesn't make effect to the other end.
func Pipe() (net.Conn, net.Conn) {
ch0 := make(chan []byte, 1000)
ch1 := make(chan []byte, 1000)
return &conn{
rCh: ch0,
wCh: ch1,
closed: make(chan struct{}),
closing: make(chan struct{}),
readDeadline: deadline.New(),
writeDeadline: deadline.New(),
}, &conn{
rCh: ch1,
wCh: ch0,
closed: make(chan struct{}),
closing: make(chan struct{}),
readDeadline: deadline.New(),
writeDeadline: deadline.New(),
}
}
type pipeAddr struct{}
func (pipeAddr) Network() string { return "pipe" }
func (pipeAddr) String() string { return ":1" }
type conn struct {
rCh chan []byte
wCh chan []byte
closed chan struct{}
closing chan struct{}
closeOnce sync.Once
readDeadline *deadline.Deadline
writeDeadline *deadline.Deadline
}
func (*conn) LocalAddr() net.Addr { return pipeAddr{} }
func (*conn) RemoteAddr() net.Addr { return pipeAddr{} }
func (c *conn) SetDeadline(t time.Time) error {
c.readDeadline.Set(t)
c.writeDeadline.Set(t)
return nil
}
func (c *conn) SetReadDeadline(t time.Time) error {
c.readDeadline.Set(t)
return nil
}
func (c *conn) SetWriteDeadline(t time.Time) error {
c.writeDeadline.Set(t)
return nil
}
func (c *conn) Read(data []byte) (n int, err error) {
select {
case <-c.closed:
return 0, io.EOF
case <-c.closing:
if len(c.rCh) == 0 {
return 0, io.EOF
}
case <-c.readDeadline.Done():
return 0, context.DeadlineExceeded
default:
}
for {
select {
case d := <-c.rCh:
if len(d) <= len(data) {
copy(data, d)
return len(d), nil
}
copy(data, d[:len(data)])
return len(data), nil
case <-c.closed:
return 0, io.EOF
case <-c.closing:
if len(c.rCh) == 0 {
return 0, io.EOF
}
case <-c.readDeadline.Done():
return 0, context.DeadlineExceeded
}
}
}
func (c *conn) cleanWriteBuffer() {
for {
select {
case <-c.wCh:
default:
return
}
}
}
func (c *conn) Write(data []byte) (n int, err error) {
select {
case <-c.closed:
return 0, io.ErrClosedPipe
case <-c.writeDeadline.Done():
c.cleanWriteBuffer()
return 0, context.DeadlineExceeded
default:
}
cData := make([]byte, len(data))
copy(cData, data)
select {
case <-c.closed:
return 0, io.ErrClosedPipe
case <-c.writeDeadline.Done():
c.cleanWriteBuffer()
return 0, context.DeadlineExceeded
case c.wCh <- cData:
return len(cData), nil
}
}
func (c *conn) Close() error {
c.closeOnce.Do(func() {
close(c.closed)
})
return nil
}