commit 8354e5bc6417f4cab09b599c39331c2e389a95e1
parent 68b84d7de9f80a201534bab0e1a68fac8c2e77a7
Author: francoispqt <francois@parquet.ninja>
Date: Sun, 20 May 2018 00:01:24 +0800
add mutex to encoding stream
Diffstat:
2 files changed, 11 insertions(+), 2 deletions(-)
diff --git a/encode_stream.go b/encode_stream.go
@@ -2,6 +2,7 @@ package gojay
import (
"strconv"
+ "sync"
"time"
)
@@ -15,6 +16,7 @@ type MarshalerStream interface {
//
// It implements conext.Context and provide a channel to notify interruption.
type StreamEncoder struct {
+ mux *sync.RWMutex
*Encoder
nConsumer int
delimiter byte
@@ -39,10 +41,12 @@ func (s *StreamEncoder) EncodeStream(m MarshalerStream) {
// resulting in a weird JSON
go consume(s, s, m)
for i := 1; i < s.nConsumer; i++ {
+ s.mux.RLock()
ss := Stream.borrowEncoder(s.w)
ss.done = s.done
ss.buf = make([]byte, 0, 512)
ss.delimiter = s.delimiter
+ s.mux.RUnlock()
go consume(s, ss, m)
}
return
@@ -115,12 +119,14 @@ func (s *StreamEncoder) Value(key interface{}) interface{} {
//
// After calling cancel, Done() will return a closed channel.
func (s *StreamEncoder) Cancel(err error) {
+ s.mux.Lock()
select {
case <-s.done:
default:
s.err = err
close(s.done)
}
+ s.mux.Unlock()
}
// AddObject adds an object to be encoded.
diff --git a/encode_stream_pool.go b/encode_stream_pool.go
@@ -1,13 +1,16 @@
package gojay
-import "io"
+import (
+ "io"
+ "sync"
+)
// NewEncoder returns a new StreamEncoder.
// It takes an io.Writer implementation to output data.
// It initiates the done channel returned by Done().
func (s stream) NewEncoder(w io.Writer) *StreamEncoder {
enc := BorrowEncoder(w)
- return &StreamEncoder{Encoder: enc, nConsumer: 1, done: make(chan struct{}, 1)}
+ return &StreamEncoder{Encoder: enc, nConsumer: 1, done: make(chan struct{}, 1), mux: &sync.RWMutex{}}
}
// BorrowEncoder borrows a StreamEncoder from the pool.