commit ecb8d5814bccf5611b694b65ff05bd86b7b8ee1c
parent 8c1466b1729193cb6d476eda713f92387ffbf186
Author: francoispqt <francois@parquet.ninja>
Date: Tue, 1 May 2018 22:52:03 +0800
add methods to stream encoder and add tests, update readme
Diffstat:
5 files changed, 288 insertions(+), 142 deletions(-)
diff --git a/README.md b/README.md
@@ -493,14 +493,14 @@ type UnmarshalerStream interface {
Example of implementation of stream reading from a WebSocket connection:
```go
// implement UnmarshalerStream
-type ChannelStream chan *TestObj
+type ChannelStream chan *user
func (c ChannelStream) UnmarshalStream(dec *gojay.StreamDecoder) error {
- obj := &TestObj{}
- if err := dec.AddObject(obj); err != nil {
+ u := &user{}
+ if err := dec.AddObject(u); err != nil {
return err
}
- c <- obj
+ c <- u
return nil
}
@@ -513,10 +513,11 @@ func main() {
log.Fatal(err)
}
// create our channel which will receive our objects
- streamChan := ChannelStream(make(chan *TestObj))
- // get a reader implementing io.Reader
- dec := gojay.Stream.NewDecoder(ws)
- // start decoding (will block the goroutine until something is written to the ReadWriter)
+ streamChan := ChannelStream(make(chan *user))
+ // borrow a decoder
+ dec := gojay.Stream.BorrowDecoder(ws)
+ // start decoding, it will block until a JSON message is decoded from the WebSocket
+ // or until Done channel is closed
go dec.DecodeStream(streamChan)
for {
select {
diff --git a/encode_array.go b/encode_array.go
@@ -21,7 +21,7 @@ func (enc *Encoder) encodeArray(v MarshalerArray) ([]byte, error) {
return enc.buf, enc.err
}
-// AddArray adds an array or slice to be encoded, must be used inside a slice or array encoding (does not encode a key)
+// AddArray adds an implementation of MarshalerArray to be encoded, must be used inside a slice or array encoding (does not encode a key)
// value must implement Marshaler
func (enc *Encoder) AddArray(v MarshalerArray) {
if v.IsNil() {
diff --git a/encode_stream.go b/encode_stream.go
@@ -1,6 +1,9 @@
package gojay
-import "strconv"
+import (
+ "strconv"
+ "time"
+)
// MarshalerStream is the interface to implement
// to continuously encode of stream of data.
@@ -13,9 +16,9 @@ type MarshalerStream interface {
// It implements conext.Context and provide a channel to notify interruption.
type StreamEncoder struct {
*Encoder
- err error
nConsumer int
delimiter byte
+ deadline *time.Time
done chan struct{}
}
@@ -91,6 +94,26 @@ func (s *StreamEncoder) Err() error {
return s.err
}
+// Deadline returns the time when work done on behalf of this context
+// should be canceled. Deadline returns ok==false when no deadline is
+// set. Successive calls to Deadline return the same results.
+func (s *StreamEncoder) Deadline() (time.Time, bool) {
+ if s.deadline != nil {
+ return *s.deadline, true
+ }
+ return time.Time{}, false
+}
+
+// SetDeadline sets the deadline
+func (s *StreamEncoder) SetDeadline(t time.Time) {
+ s.deadline = &t
+}
+
+// Value implements context.Context
+func (s *StreamEncoder) Value(key interface{}) interface{} {
+ return nil
+}
+
// Cancel cancels the consumers of the stream, interrupting the stream encoding.
//
// After calling cancel, Done() will return a closed channel.
@@ -103,8 +126,8 @@ func (s *StreamEncoder) Cancel(err error) {
}
}
-// AddObject adds an object to be encoded, must be used inside a slice or array encoding (does not encode a key)
-// value must implement MarshalerObject
+// AddObject adds an object to be encoded.
+// value must implement MarshalerObject.
func (s *StreamEncoder) AddObject(v MarshalerObject) {
if v.IsNil() {
return
@@ -115,13 +138,29 @@ func (s *StreamEncoder) AddObject(v MarshalerObject) {
s.Encoder.writeByte(s.delimiter)
}
-// AddInt adds an int to be encoded, must be used inside a slice or array encoding (does not encode a key)
+// AddString adds a string to be encoded.
+func (s *StreamEncoder) AddString(v string) {
+ s.Encoder.writeByte('"')
+ s.Encoder.writeString(v)
+ s.Encoder.writeByte('"')
+ s.Encoder.writeByte(s.delimiter)
+}
+
+// AddArray adds an implementation of MarshalerArray to be encoded.
+func (s *StreamEncoder) AddArray(v MarshalerArray) {
+ s.Encoder.writeByte('[')
+ v.MarshalArray(s.Encoder)
+ s.Encoder.writeByte(']')
+ s.Encoder.writeByte(s.delimiter)
+}
+
+// AddInt adds an int to be encoded.
func (s *StreamEncoder) AddInt(value int) {
s.buf = strconv.AppendInt(s.buf, int64(value), 10)
s.Encoder.writeByte(s.delimiter)
}
-// AddFloat adds a float64 to be encoded, must be used inside a slice or array encoding (does not encode a key)
+// AddFloat adds a float64 to be encoded.
func (s *StreamEncoder) AddFloat(value float64) {
s.buf = strconv.AppendFloat(s.buf, value, 'f', -1, 64)
s.Encoder.writeByte(s.delimiter)
diff --git a/encode_stream_pool.go b/encode_stream_pool.go
@@ -20,7 +20,7 @@ func (s stream) BorrowEncoder(w io.Writer) *StreamEncoder {
case streamEnc := <-streamEncPool:
streamEnc.isPooled = 0
streamEnc.w = w
- streamEnc.err = nil
+ streamEnc.Encoder.err = nil
streamEnc.done = make(chan struct{}, 1)
streamEnc.Encoder.buf = make([]byte, 0, 512)
streamEnc.nConsumer = 1
@@ -35,6 +35,7 @@ func (s stream) borrowEncoder(w io.Writer) *StreamEncoder {
case streamEnc := <-streamEncPool:
streamEnc.isPooled = 0
streamEnc.w = w
+ streamEnc.Encoder.err = nil
return streamEnc
default:
return s.NewEncoder(w)
diff --git a/encode_stream_test.go b/encode_stream_test.go
@@ -1,8 +1,10 @@
package gojay
import (
+ "os"
"sync"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
)
@@ -18,163 +20,58 @@ func (s StreamChanObject) MarshalStream(enc *StreamEncoder) {
}
}
-type StreamChanInt chan int
+type StreamChanSlice chan *TestEncodingArrStrings
-func (s StreamChanInt) MarshalStream(enc *StreamEncoder) {
+func (s StreamChanSlice) MarshalStream(enc *StreamEncoder) {
select {
case <-enc.Done():
return
case o := <-s:
- enc.AddInt(o)
+ enc.AddArray(o)
}
}
-type StreamChanFloat chan float64
+type StreamChanString chan string
-func (s StreamChanFloat) MarshalStream(enc *StreamEncoder) {
+func (s StreamChanString) MarshalStream(enc *StreamEncoder) {
select {
case <-enc.Done():
return
case o := <-s:
- enc.AddFloat(o)
+ enc.AddString(o)
}
}
-type StreamChanError chan *testObject
+type StreamChanInt chan int
-func (s StreamChanError) MarshalStream(enc *StreamEncoder) {
+func (s StreamChanInt) MarshalStream(enc *StreamEncoder) {
select {
case <-enc.Done():
return
- case <-s:
- enc.AddInterface(struct{}{})
+ case o := <-s:
+ enc.AddInt(o)
}
}
-func TestEncodeStreamSingleConsumer(t *testing.T) {
- expectedStr :=
- `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false}
-`
- // create our writer
- w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
- enc := Stream.NewEncoder(w).LineDelimited()
- w.enc = enc
- s := StreamChanObject(make(chan *testObject))
- go enc.EncodeStream(s)
- go feedStream(s, 100)
- select {
- case <-enc.Done():
- assert.Len(t, w.result, 100, "w.result should be 100")
- for _, b := range w.result {
- assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
- }
- }
-}
-func TestEncodeStreamSingleConsumerNilValue(t *testing.T) {
- expectedStr := ``
- // create our writer
- w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
- enc := Stream.NewEncoder(w).LineDelimited()
- w.enc = enc
- s := StreamChanObject(make(chan *testObject))
- go enc.EncodeStream(s)
- go feedStreamNil(s, 100)
- select {
- case <-enc.Done():
- assert.Nil(t, enc.Err(), "enc.Err() should not be nil")
- for _, b := range w.result {
- assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
- }
- }
-}
-func TestEncodeStreamSingleConsumerInt(t *testing.T) {
- // create our writer
- w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
- enc := Stream.NewEncoder(w).LineDelimited()
- w.enc = enc
- s := StreamChanInt(make(chan int))
- go enc.EncodeStream(s)
- go feedStreamInt(s, 100)
- select {
- case <-enc.Done():
- assert.Len(t, w.result, 100, "w.result should be 100")
- }
-}
-func TestEncodeStreamSingleConsumerFloat(t *testing.T) {
- // create our writer
- w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
- enc := Stream.NewEncoder(w).LineDelimited()
- w.enc = enc
- s := StreamChanFloat(make(chan float64))
- go enc.EncodeStream(s)
- go feedStreamFloat(s, 100)
- select {
- case <-enc.Done():
- assert.Len(t, w.result, 100, "w.result should be 100")
- }
-}
-func TestEncodeStreamSingleConsumerMarshalError(t *testing.T) {
- // create our writer
- w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
- enc := Stream.NewEncoder(w).LineDelimited()
- w.enc = enc
- s := StreamChanError(make(chan *testObject))
- go enc.EncodeStream(s)
- go feedStream(s, 100)
- select {
- case <-enc.Done():
- assert.NotNil(t, enc.Err(), "enc.Err() should not be nil")
- }
-}
+type StreamChanFloat chan float64
-func TestEncodeStreamSingleConsumerWriteError(t *testing.T) {
- // create our writer
- w := TestWriterError("")
- enc := Stream.NewEncoder(w).LineDelimited()
- s := StreamChanObject(make(chan *testObject))
- go enc.EncodeStream(s)
- go feedStream(s, 100)
- select {
- case <-enc.Done():
- assert.NotNil(t, enc.Err(), "enc.Err() should not be nil")
- }
-}
-func TestEncodeStreamSingleConsumerCommaDelimited(t *testing.T) {
- expectedStr :=
- `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false},`
- // create our writer
- w := &TestWriter{target: 5000, mux: &sync.RWMutex{}}
- enc := Stream.BorrowEncoder(w).NConsumer(50).CommaDelimited()
- w.enc = enc
- s := StreamChanObject(make(chan *testObject))
- go enc.EncodeStream(s)
- go feedStream(s, 5000)
+func (s StreamChanFloat) MarshalStream(enc *StreamEncoder) {
select {
case <-enc.Done():
- assert.Len(t, w.result, 5000, "w.result should be 100")
- for _, b := range w.result {
- assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
- }
+ return
+ case o := <-s:
+ enc.AddFloat(o)
}
}
-func TestEncodeStreamMultipleConsumer(t *testing.T) {
- expectedStr :=
- `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false}
-`
- // create our writer
- w := &TestWriter{target: 5000, mux: &sync.RWMutex{}}
- enc := Stream.NewEncoder(w).NConsumer(50).LineDelimited()
- w.enc = enc
- s := StreamChanObject(make(chan *testObject))
- go enc.EncodeStream(s)
- go feedStream(s, 5000)
+type StreamChanError chan *testObject
+
+func (s StreamChanError) MarshalStream(enc *StreamEncoder) {
select {
case <-enc.Done():
- assert.Len(t, w.result, 5000, "w.result should be 100")
- for _, b := range w.result {
- assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
- }
+ return
+ case <-s:
+ enc.AddInterface(struct{}{})
}
}
@@ -211,6 +108,18 @@ func feedStream(s chan *testObject, target int) {
}
}
+func feedStreamSlices(s chan *TestEncodingArrStrings, target int) {
+ for i := 0; i < target; i++ {
+ s <- &TestEncodingArrStrings{"test", "test2"}
+ }
+}
+
+func feedStreamStrings(s chan string, target int) {
+ for i := 0; i < target; i++ {
+ s <- "hello"
+ }
+}
+
func feedStreamInt(s chan int, target int) {
for i := 0; i < target; i++ {
s <- i
@@ -222,3 +131,199 @@ func feedStreamFloat(s chan float64, target int) {
s <- float64(i)
}
}
+
+func TestEncodeStream(t *testing.T) {
+ t.Run("single-consumer-object", func(t *testing.T) {
+ expectedStr :=
+ `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false}
+`
+ // create our writer
+ w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
+ enc := Stream.NewEncoder(w).LineDelimited()
+ w.enc = enc
+ s := StreamChanObject(make(chan *testObject))
+ go enc.EncodeStream(s)
+ go feedStream(s, 100)
+ select {
+ case <-enc.Done():
+ assert.Nil(t, enc.Err(), "enc.Err() should be nil")
+ assert.Len(t, w.result, 100, "w.result should be 100")
+ for _, b := range w.result {
+ assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
+ }
+ }
+ })
+
+ t.Run("single-consumer-slice", func(t *testing.T) {
+ expectedStr :=
+ `["test","test2"]
+`
+ // create our writer
+ w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
+ enc := Stream.NewEncoder(w).LineDelimited()
+ w.enc = enc
+ s := StreamChanSlice(make(chan *TestEncodingArrStrings))
+ go enc.EncodeStream(s)
+ go feedStreamSlices(s, 100)
+ select {
+ case <-enc.Done():
+ assert.Nil(t, enc.Err(), "enc.Err() should be nil")
+ assert.Len(t, w.result, 100, "w.result should be 100")
+ for _, b := range w.result {
+ assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
+ }
+ }
+ })
+
+ t.Run("single-consumer-string", func(t *testing.T) {
+ expectedStr :=
+ `"hello"
+`
+ // create our writer
+ w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
+ enc := Stream.NewEncoder(w).LineDelimited()
+ w.enc = enc
+ s := StreamChanString(make(chan string))
+ go enc.EncodeStream(s)
+ go feedStreamStrings(s, 100)
+ select {
+ case <-enc.Done():
+ assert.Nil(t, enc.Err(), "enc.Err() should be nil")
+ assert.Len(t, w.result, 100, "w.result should be 100")
+ for _, b := range w.result {
+ assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
+ }
+ }
+ })
+
+ t.Run("single-consumer-object-nil-value", func(t *testing.T) {
+ expectedStr := ``
+ // create our writer
+ w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
+ enc := Stream.NewEncoder(w).LineDelimited()
+ w.enc = enc
+ s := StreamChanObject(make(chan *testObject))
+ go enc.EncodeStream(s)
+ go feedStreamNil(s, 100)
+ select {
+ case <-enc.Done():
+ assert.Nil(t, enc.Err(), "enc.Err() should be nil")
+ assert.Nil(t, enc.Err(), "enc.Err() should not be nil")
+ for _, b := range w.result {
+ assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
+ }
+ }
+ })
+
+ t.Run("single-consumer-int", func(t *testing.T) {
+ // create our writer
+ w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
+ enc := Stream.NewEncoder(w).LineDelimited()
+ w.enc = enc
+ s := StreamChanInt(make(chan int))
+ go enc.EncodeStream(s)
+ go feedStreamInt(s, 100)
+ select {
+ case <-enc.Done():
+ assert.Nil(t, enc.Err(), "enc.Err() should be nil")
+ assert.Len(t, w.result, 100, "w.result should be 100")
+ }
+ })
+
+ t.Run("single-consumer-float", func(t *testing.T) {
+ // create our writer
+ w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
+ enc := Stream.NewEncoder(w).LineDelimited()
+ w.enc = enc
+ s := StreamChanFloat(make(chan float64))
+ go enc.EncodeStream(s)
+ go feedStreamFloat(s, 100)
+ select {
+ case <-enc.Done():
+ assert.Nil(t, enc.Err(), "enc.Err() should be nil")
+ assert.Len(t, w.result, 100, "w.result should be 100")
+ }
+ })
+
+ t.Run("single-consumer-marshal-error", func(t *testing.T) {
+ // create our writer
+ w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
+ enc := Stream.NewEncoder(w).LineDelimited()
+ w.enc = enc
+ s := StreamChanError(make(chan *testObject))
+ go enc.EncodeStream(s)
+ go feedStream(s, 100)
+ select {
+ case <-enc.Done():
+ assert.NotNil(t, enc.Err(), "enc.Err() should not be nil")
+ }
+ })
+
+ t.Run("single-consumer-write-error", func(t *testing.T) {
+ // create our writer
+ w := TestWriterError("")
+ enc := Stream.NewEncoder(w).LineDelimited()
+ s := StreamChanObject(make(chan *testObject))
+ go enc.EncodeStream(s)
+ go feedStream(s, 100)
+ select {
+ case <-enc.Done():
+ assert.NotNil(t, enc.Err(), "enc.Err() should not be nil")
+ }
+ })
+
+ t.Run("multiple-consumer-object-comma-delimited", func(t *testing.T) {
+ expectedStr :=
+ `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false},`
+ // create our writer
+ w := &TestWriter{target: 5000, mux: &sync.RWMutex{}}
+ enc := Stream.BorrowEncoder(w).NConsumer(50).CommaDelimited()
+ w.enc = enc
+ s := StreamChanObject(make(chan *testObject))
+ go enc.EncodeStream(s)
+ go feedStream(s, 5000)
+ select {
+ case <-enc.Done():
+ assert.Nil(t, enc.Err(), "enc.Err() should be nil")
+ assert.Len(t, w.result, 5000, "w.result should be 100")
+ for _, b := range w.result {
+ assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
+ }
+ }
+ })
+
+ t.Run("multiple-consumer-object-line-delimited", func(t *testing.T) {
+ expectedStr :=
+ `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false}
+`
+ // create our writer
+ w := &TestWriter{target: 5000, mux: &sync.RWMutex{}}
+ enc := Stream.NewEncoder(w).NConsumer(50).LineDelimited()
+ w.enc = enc
+ s := StreamChanObject(make(chan *testObject))
+ go enc.EncodeStream(s)
+ go feedStream(s, 5000)
+ select {
+ case <-enc.Done():
+ assert.Nil(t, enc.Err(), "enc.Err() should be nil")
+ assert.Len(t, w.result, 5000, "w.result should be 100")
+ for _, b := range w.result {
+ assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
+ }
+ }
+ })
+
+ t.Run("encoder-deadline", func(t *testing.T) {
+ enc := Stream.NewEncoder(os.Stdout)
+ now := time.Now()
+ enc.SetDeadline(now)
+ d, _ := enc.Deadline()
+ assert.Equal(t, now, d, "deadline should be the one just set")
+ })
+
+ // just for coverage
+ t.Run("encoder-context-value", func(t *testing.T) {
+ enc := Stream.NewEncoder(os.Stdout)
+ assert.Nil(t, enc.Value(""), "enc.Value should be nil")
+ })
+}