gojay

high performance JSON encoder/decoder with stream API for Golang
git clone git://git.lair.cx/gojay
Log | Files | Refs | README | LICENSE

commit 77b2d39674ab75670f4173f8b9d6a46db75c49e4
parent 6614f5b29388bba644bfdbc36cc3d62b42459d09
Author: francoispqt <francois@parquet.ninja>
Date:   Wed,  9 May 2018 22:15:06 +0800

update switch to sync.pool for memory pooling, performance is much better than channe

Diffstat:
Mdecode_pool.go | 70+++++++++++++++++++++++-----------------------------------------------
Mdecode_pool_test.go | 7+++++--
Mdecode_stream_pool.go | 53++++++++++++++++++++++-------------------------------
Mdecode_stream_pool_test.go | 26+++++++-------------------
Mdecode_string_test.go | 7++++++-
Mencode.go | 5+++++
Mencode_pool.go | 57++++++++++++++++++++++++++-------------------------------
Mencode_pool_test.go | 10++++++++--
Mencode_stream.go | 7++-----
Mencode_stream_pool.go | 34+++++++++++++---------------------
Mencode_stream_pool_test.go | 15++++++++++++---
11 files changed, 129 insertions(+), 162 deletions(-)

diff --git a/decode_pool.go b/decode_pool.go @@ -1,25 +1,19 @@ package gojay -import "io" +import ( + "io" + "sync" +) -var decPool = make(chan *Decoder, 32) +var decPool = sync.Pool{ + New: func() interface{} { + return NewDecoder(nil) + }, +} func init() { -initStreamDecPool: - for { - select { - case streamDecPool <- Stream.NewDecoder(nil): - default: - break initStreamDecPool - } - } -initDecPool: - for { - select { - case decPool <- NewDecoder(nil): - default: - break initDecPool - } + for i := 0; i < 32; i++ { + decPool.Put(NewDecoder(nil)) } } @@ -45,33 +39,18 @@ func BorrowDecoder(r io.Reader) *Decoder { return borrowDecoder(r, 512) } func borrowDecoder(r io.Reader, bufSize int) *Decoder { - select { - case dec := <-decPool: - dec.called = 0 - dec.keysDone = 0 - dec.cursor = 0 - dec.err = nil - dec.r = r - dec.length = 0 - dec.isPooled = 0 - if bufSize > 0 { - dec.data = make([]byte, bufSize) - } - return dec - default: - dec := &Decoder{ - called: 0, - cursor: 0, - keysDone: 0, - err: nil, - r: r, - isPooled: 0, - } - if bufSize > 0 { - dec.data = make([]byte, bufSize) - } - return dec + dec := decPool.Get().(*Decoder) + dec.called = 0 + dec.keysDone = 0 + dec.cursor = 0 + dec.err = nil + dec.r = r + dec.length = 0 + dec.isPooled = 0 + if bufSize > 0 { + dec.data = make([]byte, bufSize) } + return dec } // Release sends back a Decoder to the pool. @@ -79,8 +58,5 @@ func borrowDecoder(r io.Reader, bufSize int) *Decoder { // a panic will be raised with an InvalidUsagePooledDecoderError error. func (dec *Decoder) Release() { dec.isPooled = 1 - select { - case decPool <- dec: - default: - } + decPool.Put(dec) } diff --git a/decode_pool_test.go b/decode_pool_test.go @@ -2,6 +2,7 @@ package gojay import ( "strings" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -9,9 +10,11 @@ import ( func TestDecoderBorrowFromPool(t *testing.T) { // reset pool - decPool = make(chan *Decoder, 16) + decPool = sync.Pool{New: func() interface{} { return NewDecoder(nil) }} + dec := decPool.New().(*Decoder) + decPool.Put(dec) // borrow decoder - dec := BorrowDecoder(strings.NewReader("")) + dec = BorrowDecoder(strings.NewReader("")) // release dec.Release() // get from pool diff --git a/decode_stream_pool.go b/decode_stream_pool.go @@ -1,8 +1,15 @@ package gojay -import "io" +import ( + "io" + "sync" +) -var streamDecPool = make(chan *StreamDecoder, 32) +var streamDecPool = sync.Pool{ + New: func() interface{} { + return Stream.NewDecoder(nil) + }, +} // NewDecoder returns a new StreamDecoder. // It takes an io.Reader implementation as data input. @@ -26,32 +33,19 @@ func (s stream) BorrowDecoder(r io.Reader) *StreamDecoder { } func (s stream) borrowDecoder(r io.Reader, bufSize int) *StreamDecoder { - select { - case streamDec := <-streamDecPool: - streamDec.called = 0 - streamDec.keysDone = 0 - streamDec.cursor = 0 - streamDec.err = nil - streamDec.r = r - streamDec.length = 0 - streamDec.isPooled = 0 - streamDec.done = make(chan struct{}, 1) - if bufSize > 0 { - streamDec.data = make([]byte, bufSize) - } - return streamDec - default: - dec := NewDecoder(r) - if bufSize > 0 { - dec.data = make([]byte, bufSize) - dec.length = 0 - } - streamDec := &StreamDecoder{ - Decoder: dec, - done: make(chan struct{}, 1), - } - return streamDec + streamDec := streamDecPool.Get().(*StreamDecoder) + streamDec.called = 0 + streamDec.keysDone = 0 + streamDec.cursor = 0 + streamDec.err = nil + streamDec.r = r + streamDec.length = 0 + streamDec.isPooled = 0 + streamDec.done = make(chan struct{}, 1) + if bufSize > 0 { + streamDec.data = make([]byte, bufSize) } + return streamDec } // Release sends back a Decoder to the pool. @@ -59,8 +53,5 @@ func (s stream) borrowDecoder(r io.Reader, bufSize int) *StreamDecoder { // a panic will be raised with an InvalidUsagePooledDecoderError error. func (dec *StreamDecoder) Release() { dec.isPooled = 1 - select { - case streamDecPool <- dec: - default: - } + streamDecPool.Put(dec) } diff --git a/decode_stream_pool_test.go b/decode_stream_pool_test.go @@ -1,6 +1,7 @@ package gojay import ( + "sync" "testing" "github.com/stretchr/testify/assert" @@ -8,10 +9,10 @@ import ( func TestDecodeStreamBorrow(t *testing.T) { // we override the pool chan - streamDecPool = make(chan *StreamDecoder, 1) + streamDecPool = sync.Pool{New: func() interface{} { return Stream.NewDecoder(nil) }} // add one decoder to the channel dec := Stream.NewDecoder(nil) - streamDecPool <- dec + streamDecPool.Put(dec) // borrow one decoder to the channel nDec := Stream.BorrowDecoder(nil) // make sure they are the same @@ -20,33 +21,20 @@ func TestDecodeStreamBorrow(t *testing.T) { func TestDecodeStreamBorrow1(t *testing.T) { // we override the pool chan - streamDecPool = make(chan *StreamDecoder, 1) + streamDecPool = sync.Pool{New: func() interface{} { return Stream.NewDecoder(nil) }} // add one decoder to the channel dec := Stream.NewDecoder(nil) - streamDecPool <- dec + streamDecPool.Put(dec) // reset streamDecPool - streamDecPool = make(chan *StreamDecoder, 1) + streamDecPool = sync.Pool{New: func() interface{} { return Stream.NewDecoder(nil) }} // borrow one decoder to the channel nDec := Stream.BorrowDecoder(nil) // make sure they are the same assert.NotEqual(t, dec, nDec, "decoder added to the pool and new decoder should be the same") } -func TestDecodeStreamBorrow2(t *testing.T) { - // we override the pool chan - streamDecPool = make(chan *StreamDecoder, 1) - // add one decoder to the channel - dec := Stream.NewDecoder(nil) - dec.data = make([]byte, 128) - streamDecPool <- dec - // borrow one decoder to the channel - nDec := Stream.BorrowDecoder(nil) - // make sure they are the same - assert.Equal(t, dec, nDec, "decoder added to the pool and new decoder should be the same") - assert.Equal(t, 512, len(nDec.data), "len of dec.data should be 512") -} func TestDecodeStreamBorrow3(t *testing.T) { // we override the pool chan - streamDecPool = make(chan *StreamDecoder, 16) + streamDecPool = sync.Pool{New: func() interface{} { return Stream.NewDecoder(nil) }} // borrow one decoder to the channel nDec := Stream.BorrowDecoder(nil) // make sure they are the same diff --git a/decode_string_test.go b/decode_string_test.go @@ -2,6 +2,7 @@ package gojay import ( "strings" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -58,7 +59,11 @@ func TestDecoderStringDecoderAPI(t *testing.T) { func TestDecoderStringPoolError(t *testing.T) { // reset the pool to make sure it's not full - decPool = make(chan *Decoder, 16) + decPool = sync.Pool{ + New: func() interface{} { + return NewDecoder(nil) + }, + } result := "" dec := NewDecoder(nil) dec.Release() diff --git a/encode.go b/encode.go @@ -187,6 +187,11 @@ type Encoder struct { err error } +func (enc *Encoder) Init(w io.Writer) { + enc.w = w + enc.buf = make([]byte, 0, 512) +} + // AppendBytes allows a modular usage by appending bytes manually to the current state of the buffer. func (enc *Encoder) AppendBytes(b []byte) { enc.writeBytes(b) diff --git a/encode_pool.go b/encode_pool.go @@ -1,26 +1,28 @@ package gojay -import "io" +import ( + "io" + "sync" +) -var encPool = make(chan *Encoder, 32) -var streamEncPool = make(chan *StreamEncoder, 32) +var encPool = sync.Pool{ + New: func() interface{} { + return NewEncoder(nil) + }, +} + +var streamEncPool = sync.Pool{ + New: func() interface{} { + return Stream.NewEncoder(nil) + }, +} func init() { -initStreamEncPool: - for { - select { - case streamEncPool <- Stream.NewEncoder(nil): - default: - break initStreamEncPool - } + for i := 0; i < 32; i++ { + encPool.Put(NewEncoder(nil)) } -initEncPool: - for { - select { - case encPool <- NewEncoder(nil): - default: - break initEncPool - } + for i := 0; i < 32; i++ { + streamEncPool.Put(Stream.NewEncoder(nil)) } } @@ -34,23 +36,16 @@ func newEncoder() *Encoder { // BorrowEncoder borrows an Encoder from the pool. func BorrowEncoder(w io.Writer) *Encoder { - select { - case enc := <-encPool: - enc.isPooled = 0 - enc.w = w - enc.err = nil - return enc - default: - return &Encoder{w: w, buf: make([]byte, 0, 512)} - } + enc := encPool.Get().(*Encoder) + enc.w = w + enc.buf = enc.buf[:0] + enc.isPooled = 0 + enc.err = nil + return enc } // Release sends back a Encoder to the pool. func (enc *Encoder) Release() { - enc.buf = enc.buf[:0] enc.isPooled = 1 - select { - case encPool <- enc: - default: - } + encPool.Put(enc) } diff --git a/encode_pool_test.go b/encode_pool_test.go @@ -1,6 +1,7 @@ package gojay import ( + "sync" "testing" "github.com/stretchr/testify/assert" @@ -8,9 +9,14 @@ import ( func TestEncoderNewFromPool(t *testing.T) { // reset pool - encPool = make(chan *Encoder, 16) + encPool = sync.Pool{ + New: func() interface{} { + return NewEncoder(nil) + }, + } + // get new Encoder - enc := NewEncoder(nil) + enc := encPool.New().(*Encoder) // add to pool enc.Release() // borrow encoder diff --git a/encode_stream.go b/encode_stream.go @@ -74,11 +74,8 @@ func (s *StreamEncoder) NConsumer(n int) *StreamEncoder { // If a decoder is used after calling Release // a panic will be raised with an InvalidUsagePooledDecoderError error. func (s *StreamEncoder) Release() { - s.Encoder.isPooled = 1 - select { - case streamEncPool <- s: - default: - } + s.isPooled = 1 + streamEncPool.Put(s) } // Done returns a channel that's closed when work is done. diff --git a/encode_stream_pool.go b/encode_stream_pool.go @@ -16,28 +16,20 @@ func (s stream) NewEncoder(w io.Writer) *StreamEncoder { // // If no StreamEncoder is available in the pool, it returns a fresh one func (s stream) BorrowEncoder(w io.Writer) *StreamEncoder { - select { - case streamEnc := <-streamEncPool: - streamEnc.isPooled = 0 - streamEnc.w = w - streamEnc.Encoder.err = nil - streamEnc.done = make(chan struct{}, 1) - streamEnc.Encoder.buf = make([]byte, 0, 512) - streamEnc.nConsumer = 1 - return streamEnc - default: - return s.NewEncoder(w) - } + streamEnc := streamEncPool.Get().(*StreamEncoder) + streamEnc.w = w + streamEnc.Encoder.err = nil + streamEnc.done = make(chan struct{}, 1) + streamEnc.Encoder.buf = streamEnc.buf[:0] + streamEnc.nConsumer = 1 + streamEnc.isPooled = 0 + return streamEnc } func (s stream) borrowEncoder(w io.Writer) *StreamEncoder { - select { - case streamEnc := <-streamEncPool: - streamEnc.isPooled = 0 - streamEnc.w = w - streamEnc.Encoder.err = nil - return streamEnc - default: - return s.NewEncoder(w) - } + streamEnc := streamEncPool.Get().(*StreamEncoder) + streamEnc.isPooled = 0 + streamEnc.w = w + streamEnc.Encoder.err = nil + return streamEnc } diff --git a/encode_stream_pool_test.go b/encode_stream_pool_test.go @@ -1,6 +1,7 @@ package gojay import ( + "sync" "testing" "github.com/stretchr/testify/assert" @@ -8,12 +9,20 @@ import ( func TestEncodeStreamBorrow1(t *testing.T) { // we override the pool chan - streamEncPool = make(chan *StreamEncoder, 1) + streamEncPool = sync.Pool{ + New: func() interface{} { + return Stream.NewEncoder(nil) + }, + } // add one decoder to the channel enc := Stream.NewEncoder(nil) - streamEncPool <- enc + streamEncPool.Put(enc) // reset streamEncPool - streamEncPool = make(chan *StreamEncoder, 1) + streamEncPool = sync.Pool{ + New: func() interface{} { + return Stream.NewEncoder(nil) + }, + } // borrow one decoder to the channel nEnc := Stream.BorrowEncoder(nil) // make sure they are the same