gojay

high performance JSON encoder/decoder with stream API for Golang
git clone git://git.lair.cx/gojay
Log | Files | Refs | README | LICENSE

commit 8ef51f83cdce8c580333e02797c6e0693be5205f
parent c5402e694dcd8f75c0dce2882a51e6ee921ea5f1
Author: francoispqt <francois@parquet.ninja>
Date:   Tue,  1 May 2018 01:48:58 +0800

first commit add io.Writer support and stream api for encoding

Diffstat:
Mdecode_pool.go | 2+-
Mdecode_stream_pool.go | 17+++++++++++++++--
Mdecode_string_test.go | 2++
Mencode.go | 46+++++++++++++++++++++++++++++-----------------
Mencode_array.go | 2+-
Mencode_array_test.go | 4++--
Mencode_bool.go | 2+-
Mencode_bool_test.go | 6+++---
Mencode_builder.go | 2+-
Mencode_builder_test.go | 6+++---
Mencode_interface_test.go | 2+-
Mencode_number.go | 4++--
Mencode_number_test.go | 4++--
Mencode_object.go | 16+++++++++++-----
Mencode_object_test.go | 32++++++++++++++++++++++++++++++--
Mencode_pool.go | 19++++++++++++-------
Mencode_pool_test.go | 6+++---
Aencode_stream.go | 158+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aencode_stream_pool.go | 41+++++++++++++++++++++++++++++++++++++++++
Aencode_stream_pool_test.go | 21+++++++++++++++++++++
Aencode_stream_test.go | 189+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mencode_string.go | 2+-
Mencode_string_test.go | 2+-
23 files changed, 530 insertions(+), 55 deletions(-)

diff --git a/decode_pool.go b/decode_pool.go @@ -59,9 +59,9 @@ func borrowDecoder(r io.Reader, bufSize int) *Decoder { // If a decoder is used after calling Release // a panic will be raised with an InvalidUsagePooledDecoderError error. func (dec *Decoder) Release() { - dec.isPooled = 1 select { case decPool <- dec: + dec.isPooled = 1 default: } } diff --git a/decode_stream_pool.go b/decode_stream_pool.go @@ -4,7 +4,7 @@ import "io" var streamDecPool = make(chan *StreamDecoder, 16) -// NewDecoder returns a new decoder. +// NewDecoder returns a new StreamDecoder. // It takes an io.Reader implementation as data input. // It initiates the done channel returned by Done(). func (s stream) NewDecoder(r io.Reader) *StreamDecoder { @@ -16,9 +16,11 @@ func (s stream) NewDecoder(r io.Reader) *StreamDecoder { return streamDec } -// BorrowDecoder borrows a StreamDecoder a decoder from the pool. +// BorrowDecoder borrows a StreamDecoder from the pool. // It takes an io.Reader implementation as data input. // It initiates the done channel returned by Done(). +// +// If no StreamEncoder is available in the pool, it returns a fresh one func (s stream) BorrowDecoder(r io.Reader) *StreamDecoder { return s.borrowDecoder(r, 512) } @@ -51,3 +53,14 @@ func (s stream) borrowDecoder(r io.Reader, bufSize int) *StreamDecoder { return streamDec } } + +// Release sends back a Decoder to the pool. +// If a decoder is used after calling Release +// a panic will be raised with an InvalidUsagePooledDecoderError error. +func (dec *StreamDecoder) Release() { + select { + case streamDecPool <- dec: + dec.isPooled = 1 + default: + } +} diff --git a/decode_string_test.go b/decode_string_test.go @@ -57,6 +57,8 @@ func TestDecoderStringDecoderAPI(t *testing.T) { } func TestDecoderStringPoolError(t *testing.T) { + // reset the pool to make sure it's not full + decPool = make(chan *Decoder, 16) result := "" dec := NewDecoder(nil) dec.Release() diff --git a/encode.go b/encode.go @@ -2,6 +2,7 @@ package gojay import ( "fmt" + "io" "reflect" ) @@ -28,7 +29,7 @@ import ( // fmt.Println(b) // {"id":123456} // } func MarshalObject(v MarshalerObject) ([]byte, error) { - enc := NewEncoder() + enc := newEncoder() defer enc.Release() return enc.encodeObject(v) } @@ -55,7 +56,7 @@ func MarshalObject(v MarshalerObject) ([]byte, error) { // fmt.Println(b) // [{"id":123456},{"id":7890}] // } func MarshalArray(v MarshalerArray) ([]byte, error) { - enc := NewEncoder() + enc := newEncoder() enc.grow(200) enc.writeByte('[') v.(MarshalerArray).MarshalArray(enc) @@ -99,7 +100,7 @@ func Marshal(v interface{}) ([]byte, error) { var err error = InvalidTypeError("Unknown type to Marshal") switch vt := v.(type) { case MarshalerObject: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) enc.writeByte('{') vt.MarshalObject(enc) enc.writeByte('}') @@ -107,7 +108,7 @@ func Marshal(v interface{}) ([]byte, error) { defer enc.Release() return b, nil case MarshalerArray: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) enc.writeByte('[') vt.MarshalArray(enc) enc.writeByte(']') @@ -115,56 +116,56 @@ func Marshal(v interface{}) ([]byte, error) { defer enc.Release() return b, nil case string: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) b, err = enc.encodeString(vt) defer enc.Release() case bool: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) err = enc.AddBool(vt) b = enc.buf defer enc.Release() case int: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) b, err = enc.encodeInt(int64(vt)) defer enc.Release() case int64: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() return enc.encodeInt(vt) case int32: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() return enc.encodeInt(int64(vt)) case int16: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() return enc.encodeInt(int64(vt)) case int8: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() return enc.encodeInt(int64(vt)) case uint64: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() return enc.encodeInt(int64(vt)) case uint32: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() return enc.encodeInt(int64(vt)) case uint16: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() return enc.encodeInt(int64(vt)) case uint8: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) b, err = enc.encodeInt(int64(vt)) defer enc.Release() case float64: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() return enc.encodeFloat(vt) case float32: - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() return enc.encodeFloat(float64(vt)) default: @@ -189,6 +190,8 @@ type MarshalerArray interface { type Encoder struct { buf []byte isPooled byte + w io.Writer + err error } func (enc *Encoder) getPreviousRune() (byte, bool) { @@ -198,3 +201,12 @@ func (enc *Encoder) getPreviousRune() (byte, bool) { } return enc.buf[last], true } + +func (enc *Encoder) write() (int, error) { + i, err := enc.w.Write(enc.buf) + if err != nil { + enc.err = err + } + enc.buf = make([]byte, 0, 512) + return i, err +} diff --git a/encode_array.go b/encode_array.go @@ -38,7 +38,7 @@ func (enc *Encoder) AddArrayKey(key string, value MarshalerArray) error { } enc.writeByte('"') enc.writeString(key) - enc.write(objKeyArr) + enc.writeBytes(objKeyArr) value.MarshalArray(enc) enc.writeByte(']') return nil diff --git a/encode_array_test.go b/encode_array_test.go @@ -136,7 +136,7 @@ func TestEncoderArrayInterfacesEncoderAPI(t *testing.T) { testBool: true, }, } - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() r, err := enc.EncodeArray(v) assert.Nil(t, err, "Error should be nil") @@ -149,7 +149,7 @@ func TestEncoderArrayInterfacesEncoderAPI(t *testing.T) { func TestEncoderArrayPooledError(t *testing.T) { v := &testEncodingArrInterfaces{} - enc := BorrowEncoder() + enc := BorrowEncoder(nil) enc.Release() defer func() { err := recover() diff --git a/encode_bool.go b/encode_bool.go @@ -42,7 +42,7 @@ func (enc *Encoder) AddBoolKey(key string, value bool) error { } enc.writeByte('"') enc.writeString(key) - enc.write(objKey) + enc.writeBytes(objKey) enc.buf = strconv.AppendBool(enc.buf, value) return nil } diff --git a/encode_bool_test.go b/encode_bool_test.go @@ -7,7 +7,7 @@ import ( ) func TestEncoderBoolTrue(t *testing.T) { - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() b, err := enc.EncodeBool(true) assert.Nil(t, err, "err must be nil") @@ -15,7 +15,7 @@ func TestEncoderBoolTrue(t *testing.T) { } func TestEncoderBoolFalse(t *testing.T) { - enc := BorrowEncoder() + enc := BorrowEncoder(nil) defer enc.Release() b, err := enc.EncodeBool(false) assert.Nil(t, err, "err must be nil") @@ -23,7 +23,7 @@ func TestEncoderBoolFalse(t *testing.T) { } func TestEncoderBoolPoolError(t *testing.T) { - enc := BorrowEncoder() + enc := BorrowEncoder(nil) enc.Release() defer func() { err := recover() diff --git a/encode_builder.go b/encode_builder.go @@ -20,7 +20,7 @@ func (enc *Encoder) grow(n int) { // Write appends the contents of p to b's Buffer. // Write always returns len(p), nil. -func (enc *Encoder) write(p []byte) { +func (enc *Encoder) writeBytes(p []byte) { enc.buf = append(enc.buf, p...) } diff --git a/encode_builder_test.go b/encode_builder_test.go @@ -2,15 +2,16 @@ package gojay import ( "testing" + "github.com/stretchr/testify/assert" ) func TestEncoderBuilderError(t *testing.T) { - enc := NewEncoder() + enc := NewEncoder(nil) defer func() { err := recover() assert.NotNil(t, err, "err is not nil as we pass an invalid number to grow") }() enc.grow(-1) assert.True(t, false, "should not be called") -} -\ No newline at end of file +} diff --git a/encode_interface_test.go b/encode_interface_test.go @@ -122,7 +122,7 @@ var encoderTestCases = []struct { func TestEncoderInterfaceAllTypesDecoderAPI(t *testing.T) { for _, test := range encoderTestCases { - enc := BorrowEncoder() + enc := BorrowEncoder(nil) b, err := enc.Encode(test.v) enc.Release() test.expectations(t, b, err) diff --git a/encode_number.go b/encode_number.go @@ -61,7 +61,7 @@ func (enc *Encoder) AddIntKey(key string, value int) error { } enc.writeByte('"') enc.writeString(key) - enc.write(objKey) + enc.writeBytes(objKey) enc.buf = strconv.AppendInt(enc.buf, int64(value), 10) return nil @@ -75,7 +75,7 @@ func (enc *Encoder) AddFloatKey(key string, value float64) error { } enc.writeByte('"') enc.writeString(key) - enc.write(objKey) + enc.writeBytes(objKey) enc.buf = strconv.AppendFloat(enc.buf, value, 'f', -1, 64) return nil diff --git a/encode_number_test.go b/encode_number_test.go @@ -104,7 +104,7 @@ func TestEncoderFloat(t *testing.T) { func TestEncoderIntPooledError(t *testing.T) { v := 1 - enc := BorrowEncoder() + enc := BorrowEncoder(nil) enc.Release() defer func() { err := recover() @@ -118,7 +118,7 @@ func TestEncoderIntPooledError(t *testing.T) { func TestEncoderFloatPooledError(t *testing.T) { v := 1.1 - enc := BorrowEncoder() + enc := BorrowEncoder(nil) enc.Release() defer func() { err := recover() diff --git a/encode_object.go b/encode_object.go @@ -6,11 +6,17 @@ var objKeyArr = []byte(`":[`) var objKey = []byte(`":`) // EncodeObject encodes an object to JSON -func (enc *Encoder) EncodeObject(v MarshalerObject) ([]byte, error) { +func (enc *Encoder) EncodeObject(v MarshalerObject) error { if enc.isPooled == 1 { panic(InvalidUsagePooledEncoderError("Invalid usage of pooled encoder")) } - return enc.encodeObject(v) + _, _ = enc.encodeObject(v) + _, err := enc.write() + if err != nil { + enc.err = err + return err + } + return nil } func (enc *Encoder) encodeObject(v MarshalerObject) ([]byte, error) { enc.grow(200) @@ -21,7 +27,7 @@ func (enc *Encoder) encodeObject(v MarshalerObject) ([]byte, error) { } // AddObject adds an object to be encoded, must be used inside a slice or array encoding (does not encode a key) -// value must implement Marshaler +// value must implement MarshalerObject func (enc *Encoder) AddObject(value MarshalerObject) error { if value.IsNil() { return nil @@ -37,7 +43,7 @@ func (enc *Encoder) AddObject(value MarshalerObject) error { } // AddObjectKey adds a struct to be encoded, must be used inside an object as it will encode a key -// value must implement Marshaler +// value must implement MarshalerObject func (enc *Encoder) AddObjectKey(key string, value MarshalerObject) error { if value.IsNil() { return nil @@ -48,7 +54,7 @@ func (enc *Encoder) AddObjectKey(key string, value MarshalerObject) error { } enc.writeByte('"') enc.writeString(key) - enc.write(objKeyObj) + enc.writeBytes(objKeyObj) value.MarshalObject(enc) enc.writeByte('}') return nil diff --git a/encode_object_test.go b/encode_object_test.go @@ -1,6 +1,8 @@ package gojay import ( + "errors" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -52,6 +54,32 @@ func TestEncoderObjectBasic(t *testing.T) { "Result of marshalling is different as the one expected", ) } +func TestEncoderObjectBasicEncoderApi(t *testing.T) { + builder := &strings.Builder{} + enc := NewEncoder(builder) + err := enc.EncodeObject(&testObject{"漢字", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.1, 1.1, true}) + assert.Nil(t, err, "Error should be nil") + assert.Equal( + t, + `{"testStr":"漢字","testInt":1,"testInt64":1,"testInt32":1,"testInt16":1,"testInt8":1,"testUint64":1,"testUint32":1,"testUint16":1,"testUint8":1,"testFloat64":1.1,"testFloat32":1.1,"testBool":true}`, + builder.String(), + "Result of marshalling is different as the one expected", + ) +} + +type TestWiterError string + +func (t TestWiterError) Write(b []byte) (int, error) { + return 0, errors.New("Test Error") +} + +func TestEncoderObjectBasicEncoderApiError(t *testing.T) { + w := TestWiterError("") + enc := NewEncoder(w) + err := enc.EncodeObject(&testObject{"漢字", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.1, 1.1, true}) + assert.NotNil(t, err, "Error should not be nil") + assert.Equal(t, "Test Error", err.Error(), "err.Error() should be 'Test Error'") +} type TestEncoding struct { test string @@ -250,7 +278,7 @@ func TestEncoderObjectInterfaces(t *testing.T) { func TestEncoderObjectPooledError(t *testing.T) { v := &TestEncoding{} - enc := BorrowEncoder() + enc := BorrowEncoder(nil) enc.Release() defer func() { err := recover() @@ -258,6 +286,6 @@ func TestEncoderObjectPooledError(t *testing.T) { assert.IsType(t, InvalidUsagePooledEncoderError(""), err, "err should be of type InvalidUsagePooledEncoderError") assert.Equal(t, "Invalid usage of pooled encoder", err.(InvalidUsagePooledEncoderError).Error(), "err should be of type InvalidUsagePooledDecoderError") }() - _, _ = enc.EncodeObject(v) + _ = enc.EncodeObject(v) assert.True(t, false, "should not be called as it should have panicked") } diff --git a/encode_pool.go b/encode_pool.go @@ -1,23 +1,28 @@ package gojay -var encObjPool = make(chan *Encoder, 16) +import "io" + +var encPool = make(chan *Encoder, 16) +var streamEncPool = make(chan *StreamEncoder, 16) // NewEncoder returns a new encoder or borrows one from the pool -func NewEncoder() *Encoder { - return &Encoder{} +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w: w} } func newEncoder() *Encoder { return &Encoder{} } // BorrowEncoder borrows an Encoder from the pool. -func BorrowEncoder() *Encoder { +func BorrowEncoder(w io.Writer) *Encoder { select { - case enc := <-encObjPool: + case enc := <-encPool: enc.isPooled = 0 + enc.w = w + enc.buf = make([]byte, 0) return enc default: - return &Encoder{} + return &Encoder{w: w} } } @@ -25,7 +30,7 @@ func BorrowEncoder() *Encoder { func (enc *Encoder) Release() { enc.buf = nil select { - case encObjPool <- enc: + case encPool <- enc: enc.isPooled = 1 default: } diff --git a/encode_pool_test.go b/encode_pool_test.go @@ -8,13 +8,13 @@ import ( func TestEncoderNewFromPool(t *testing.T) { // reset pool - encObjPool = make(chan *Encoder, 16) + encPool = make(chan *Encoder, 16) // get new Encoder - enc := NewEncoder() + enc := NewEncoder(nil) // add to pool enc.Release() // borrow encoder - nEnc := BorrowEncoder() + nEnc := BorrowEncoder(nil) // make sure it's the same assert.Equal(t, enc, nEnc, "enc and nEnc from pool should be the same") } diff --git a/encode_stream.go b/encode_stream.go @@ -0,0 +1,158 @@ +package gojay + +import "strconv" + +// MarshalerStream is the interface to implement +// to continuously encode of stream of data. +type MarshalerStream interface { + MarshalStream(enc *StreamEncoder) error +} + +// A StreamEncoder reads and encodes values to JSON from an input stream. +// +// It implements conext.Context and provide a channel to notify interruption. +type StreamEncoder struct { + *Encoder + err error + nConsumer int + delimiter byte + done chan struct{} +} + +// EncodeStream spins up a defined number of non blocking consumers of the MarshalerStream m. +// +// m must implement MarshalerStream. Ideally m is a channel. See example for implementation. +// +// See the documentation for Marshal for details about the conversion of Go value to JSON. +func (s *StreamEncoder) EncodeStream(m MarshalerStream) { + // if a single consumer, just use this encoder + if s.nConsumer == 1 { + go consume(s, s, m) + return + } + // else use this Encoder only for first consumer + // and use new encoders for other consumers + // this is to avoid concurrent writing to same buffer + // resulting in a weird JSON + go consume(s, s, m) + for i := 1; i < s.nConsumer; i++ { + ss := Stream.borrowEncoder(s.w) + ss.done = s.done + ss.buf = make([]byte, 0, 512) + ss.delimiter = s.delimiter + go consume(s, ss, m) + } + return +} + +// LineDelimited sets the delimiter to a new line character. +// +// It will add a new line after each JSON marshaled by the MarshalerStream +func (s *StreamEncoder) LineDelimited() *StreamEncoder { + s.delimiter = '\n' + return s +} + +// CommaDelimited sets the delimiter to a comma. +// +// It will add a new line after each JSON marshaled by the MarshalerStream +func (s *StreamEncoder) CommaDelimited() *StreamEncoder { + s.delimiter = ',' + return s +} + +// NConsumer sets the number of non blocking go routine to consume the stream. +func (s *StreamEncoder) NConsumer(n int) *StreamEncoder { + s.nConsumer = n + return s +} + +// Release sends back a Decoder to the pool. +// If a decoder is used after calling Release +// a panic will be raised with an InvalidUsagePooledDecoderError error. +func (s *StreamEncoder) Release() { + select { + case streamEncPool <- s: + s.Encoder.isPooled = 1 + default: + } +} + +// Done returns a channel that's closed when work is done. +// It implements context.Context +func (s *StreamEncoder) Done() <-chan struct{} { + return s.done +} + +// Err returns nil if Done is not yet closed. +// If Done is closed, Err returns a non-nil error explaining why. +// It implements context.Context +func (s *StreamEncoder) Err() error { + return s.err +} + +// Cancel cancels the consumers of the stream, interrupting the stream encoding. +// +// After calling cancel, Done() will return a closed channel. +func (s *StreamEncoder) Cancel(err error) { + select { + case <-s.done: + default: + s.err = err + close(s.done) + } +} + +// AddObject adds an object to be encoded, must be used inside a slice or array encoding (does not encode a key) +// value must implement MarshalerObject +func (s *StreamEncoder) AddObject(v MarshalerObject) error { + if v.IsNil() { + return nil + } + s.Encoder.writeByte('{') + v.MarshalObject(s.Encoder) + s.Encoder.writeByte('}') + s.Encoder.writeByte(s.delimiter) + return nil +} + +// AddInt adds an int to be encoded, must be used inside a slice or array encoding (does not encode a key) +func (s *StreamEncoder) AddInt(value int) error { + s.buf = strconv.AppendInt(s.buf, int64(value), 10) + s.Encoder.writeByte(s.delimiter) + return nil +} + +// AddFloat adds a float64 to be encoded, must be used inside a slice or array encoding (does not encode a key) +func (s *StreamEncoder) AddFloat(value float64) error { + s.buf = strconv.AppendFloat(s.buf, value, 'f', -1, 64) + s.Encoder.writeByte(s.delimiter) + return nil +} + +// Non exposed + +func consume(init *StreamEncoder, s *StreamEncoder, m MarshalerStream) { + defer s.Release() + for { + select { + case <-init.Done(): + return + default: + err := m.MarshalStream(s) + if err != nil { + init.Cancel(err) + return + } + i, err := s.Encoder.write() + if i == 0 { + init.Cancel(err) + return + } + if err != nil { + init.Cancel(err) + return + } + } + } +} diff --git a/encode_stream_pool.go b/encode_stream_pool.go @@ -0,0 +1,41 @@ +package gojay + +import "io" + +// NewEncoder returns a new StreamEncoder. +// It takes an io.Writer implementation to output data. +// It initiates the done channel returned by Done(). +func (s stream) NewEncoder(w io.Writer) *StreamEncoder { + enc := BorrowEncoder(w) + return &StreamEncoder{Encoder: enc, nConsumer: 1, done: make(chan struct{}, 1)} +} + +// BorrowEncoder borrows a StreamEncoder from the pool. +// It takes an io.Writer implementation to output data. +// It initiates the done channel returned by Done(). +// +// If no StreamEncoder is available in the pool, it returns a fresh one +func (s stream) BorrowEncoder(w io.Writer) *StreamEncoder { + select { + case streamEnc := <-streamEncPool: + streamEnc.isPooled = 0 + streamEnc.w = w + streamEnc.done = make(chan struct{}, 1) + streamEnc.Encoder.buf = make([]byte, 0, 512) + streamEnc.nConsumer = 1 + return streamEnc + default: + return s.NewEncoder(w) + } +} + +func (s stream) borrowEncoder(w io.Writer) *StreamEncoder { + select { + case streamEnc := <-streamEncPool: + streamEnc.isPooled = 0 + streamEnc.w = w + return streamEnc + default: + return s.NewEncoder(w) + } +} diff --git a/encode_stream_pool_test.go b/encode_stream_pool_test.go @@ -0,0 +1,21 @@ +package gojay + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEncodeStreamBorrow1(t *testing.T) { + // we override the pool chan + streamEncPool = make(chan *StreamEncoder, 1) + // add one decoder to the channel + enc := Stream.NewEncoder(nil) + streamEncPool <- enc + // reset streamEncPool + streamEncPool = make(chan *StreamEncoder, 1) + // borrow one decoder to the channel + nEnc := Stream.BorrowEncoder(nil) + // make sure they are the same + assert.NotEqual(t, enc, nEnc, "encoder added to the pool and new decoder should be the same") +} diff --git a/encode_stream_test.go b/encode_stream_test.go @@ -0,0 +1,189 @@ +package gojay + +import ( + "errors" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +type StreamChanObject chan *testObject + +func (s StreamChanObject) MarshalStream(enc *StreamEncoder) error { + select { + case <-enc.Done(): + return enc.Err() + case o := <-s: + return enc.AddObject(o) + } +} + +type StreamChanInt chan int + +func (s StreamChanInt) MarshalStream(enc *StreamEncoder) error { + select { + case <-enc.Done(): + return enc.Err() + case o := <-s: + return enc.AddInt(o) + } +} + +type StreamChanFloat chan float64 + +func (s StreamChanFloat) MarshalStream(enc *StreamEncoder) error { + select { + case <-enc.Done(): + return enc.Err() + case o := <-s: + return enc.AddFloat(o) + } +} + +type StreamChanError chan *testObject + +func (s StreamChanError) MarshalStream(enc *StreamEncoder) error { + select { + case <-enc.Done(): + return enc.Err() + case <-s: + return errors.New("Test Error") + } +} + +func TestEncodeStreamSingleConsumer(t *testing.T) { + expectedStr := + `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false} +` + // create our writer + w := &TestWriter{target: 100, mux: &sync.Mutex{}} + enc := Stream.NewEncoder(w).LineDelimited() + w.enc = enc + s := StreamChanObject(make(chan *testObject)) + go enc.EncodeStream(s) + go feedStream(s, 100) + select { + case <-enc.Done(): + assert.Len(t, w.result, 100, "w.result should be 100") + for _, b := range w.result { + assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string") + } + } +} +func TestEncodeStreamSingleConsumerInt(t *testing.T) { + // create our writer + w := &TestWriter{target: 100, mux: &sync.Mutex{}} + enc := Stream.NewEncoder(w).LineDelimited() + w.enc = enc + s := StreamChanInt(make(chan int)) + go enc.EncodeStream(s) + go feedStreamInt(s, 100) + select { + case <-enc.Done(): + assert.Len(t, w.result, 100, "w.result should be 100") + } +} +func TestEncodeStreamSingleConsumerFloat(t *testing.T) { + // create our writer + w := &TestWriter{target: 100, mux: &sync.Mutex{}} + enc := Stream.NewEncoder(w).LineDelimited() + w.enc = enc + s := StreamChanFloat(make(chan float64)) + go enc.EncodeStream(s) + go feedStreamFloat(s, 100) + select { + case <-enc.Done(): + assert.Len(t, w.result, 100, "w.result should be 100") + } +} +func TestEncodeStreamSingleConsumerMarshalError(t *testing.T) { + // create our writer + w := &TestWriter{target: 100, mux: &sync.Mutex{}} + enc := Stream.NewEncoder(w).LineDelimited() + w.enc = enc + s := StreamChanError(make(chan *testObject)) + go enc.EncodeStream(s) + go feedStream(s, 100) + select { + case <-enc.Done(): + assert.NotNil(t, enc.Err(), "enc.Err() should not be nil") + } +} +func TestEncodeStreamSingleConsumerCommaDelimited(t *testing.T) { + expectedStr := + `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false},` + // create our writer + w := &TestWriter{target: 5000, mux: &sync.Mutex{}} + enc := Stream.BorrowEncoder(w).NConsumer(50).CommaDelimited() + w.enc = enc + s := StreamChanObject(make(chan *testObject)) + go enc.EncodeStream(s) + go feedStream(s, 5000) + select { + case <-enc.Done(): + assert.Len(t, w.result, 5000, "w.result should be 100") + for _, b := range w.result { + assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string") + } + } +} + +func TestEncodeStreamMultipleConsumer(t *testing.T) { + expectedStr := + `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false} +` + // create our writer + w := &TestWriter{target: 5000, mux: &sync.Mutex{}} + enc := Stream.NewEncoder(w).NConsumer(50).LineDelimited() + w.enc = enc + s := StreamChanObject(make(chan *testObject)) + go enc.EncodeStream(s) + go feedStream(s, 5000) + select { + case <-enc.Done(): + assert.Len(t, w.result, 5000, "w.result should be 100") + for _, b := range w.result { + assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string") + } + } +} + +// TestWriter to assert result +type TestWriter struct { + nWrite *int + target int + enc *StreamEncoder + result [][]byte + mux *sync.Mutex +} + +func (w *TestWriter) Write(b []byte) (int, error) { + if len(b) > 0 { + w.mux.Lock() + w.result = append(w.result, b) + w.mux.Unlock() + if len(w.result) == w.target { + w.enc.Cancel(nil) + } + } + return len(b), nil +} + +func feedStream(s chan *testObject, target int) { + for i := 0; i < target; i++ { + s <- &testObject{} + } +} + +func feedStreamInt(s chan int, target int) { + for i := 0; i < target; i++ { + s <- i + } +} + +func feedStreamFloat(s chan float64, target int) { + for i := 0; i < target; i++ { + s <- float64(i) + } +} diff --git a/encode_string.go b/encode_string.go @@ -38,7 +38,7 @@ func (enc *Encoder) AddStringKey(key, value string) error { } enc.writeByte('"') enc.writeString(key) - enc.write(objKeyStr) + enc.writeBytes(objKeyStr) enc.writeString(value) enc.writeByte('"') diff --git a/encode_string_test.go b/encode_string_test.go @@ -28,7 +28,7 @@ func TestEncoderStringUTF8(t *testing.T) { func TestEncoderStringPooledError(t *testing.T) { v := "" - enc := BorrowEncoder() + enc := BorrowEncoder(nil) enc.Release() defer func() { err := recover()