// Package dpipe provides the pipe works like datagram protocol on memory. package dpipe import ( "context" "io" "net" "sync" "time" "github.com/pion/transport/deadline" ) // Pipe creates pair of non-stream conn on memory. // Close of the one end doesn't make effect to the other end. func Pipe() (net.Conn, net.Conn) { ch0 := make(chan []byte, 1000) ch1 := make(chan []byte, 1000) return &conn{ rCh: ch0, wCh: ch1, closed: make(chan struct{}), closing: make(chan struct{}), readDeadline: deadline.New(), writeDeadline: deadline.New(), }, &conn{ rCh: ch1, wCh: ch0, closed: make(chan struct{}), closing: make(chan struct{}), readDeadline: deadline.New(), writeDeadline: deadline.New(), } } type pipeAddr struct{} func (pipeAddr) Network() string { return "pipe" } func (pipeAddr) String() string { return ":1" } type conn struct { rCh chan []byte wCh chan []byte closed chan struct{} closing chan struct{} closeOnce sync.Once readDeadline *deadline.Deadline writeDeadline *deadline.Deadline } func (*conn) LocalAddr() net.Addr { return pipeAddr{} } func (*conn) RemoteAddr() net.Addr { return pipeAddr{} } func (c *conn) SetDeadline(t time.Time) error { c.readDeadline.Set(t) c.writeDeadline.Set(t) return nil } func (c *conn) SetReadDeadline(t time.Time) error { c.readDeadline.Set(t) return nil } func (c *conn) SetWriteDeadline(t time.Time) error { c.writeDeadline.Set(t) return nil } func (c *conn) Read(data []byte) (n int, err error) { select { case <-c.closed: return 0, io.EOF case <-c.closing: if len(c.rCh) == 0 { return 0, io.EOF } case <-c.readDeadline.Done(): return 0, context.DeadlineExceeded default: } for { select { case d := <-c.rCh: if len(d) <= len(data) { copy(data, d) return len(d), nil } copy(data, d[:len(data)]) return len(data), nil case <-c.closed: return 0, io.EOF case <-c.closing: if len(c.rCh) == 0 { return 0, io.EOF } case <-c.readDeadline.Done(): return 0, context.DeadlineExceeded } } } func (c *conn) cleanWriteBuffer() { for { select { case <-c.wCh: default: return } } } func (c *conn) Write(data []byte) (n int, err error) { select { case <-c.closed: return 0, io.ErrClosedPipe case <-c.writeDeadline.Done(): c.cleanWriteBuffer() return 0, context.DeadlineExceeded default: } cData := make([]byte, len(data)) copy(cData, data) select { case <-c.closed: return 0, io.ErrClosedPipe case <-c.writeDeadline.Done(): c.cleanWriteBuffer() return 0, context.DeadlineExceeded case c.wCh <- cData: return len(cData), nil } } func (c *conn) Close() error { c.closeOnce.Do(func() { close(c.closed) }) return nil }