gojay

high performance JSON encoder/decoder with stream API for Golang
git clone git://git.lair.cx/gojay
Log | Files | Refs | README | LICENSE

encode_stream_test.go (9256B)


      1 package gojay
      2 
      3 import (
      4 	"os"
      5 	"sync"
      6 	"testing"
      7 	"time"
      8 
      9 	"github.com/stretchr/testify/assert"
     10 )
     11 
     12 type StreamChanObject chan *testObject
     13 
     14 func (s StreamChanObject) MarshalStream(enc *StreamEncoder) {
     15 	select {
     16 	case <-enc.Done():
     17 		return
     18 	case o := <-s:
     19 		enc.AddObject(o)
     20 	}
     21 }
     22 
     23 type StreamChanSlice chan *TestEncodingArrStrings
     24 
     25 func (s StreamChanSlice) MarshalStream(enc *StreamEncoder) {
     26 	select {
     27 	case <-enc.Done():
     28 		return
     29 	case o := <-s:
     30 		enc.AddArray(o)
     31 	}
     32 }
     33 
     34 type StreamChanString chan string
     35 
     36 func (s StreamChanString) MarshalStream(enc *StreamEncoder) {
     37 	select {
     38 	case <-enc.Done():
     39 		return
     40 	case o := <-s:
     41 		enc.AddString(o)
     42 	}
     43 }
     44 
     45 type StreamChanInt chan int
     46 
     47 func (s StreamChanInt) MarshalStream(enc *StreamEncoder) {
     48 	select {
     49 	case <-enc.Done():
     50 		return
     51 	case o := <-s:
     52 		enc.AddInt(o)
     53 	}
     54 }
     55 
     56 type StreamChanFloat chan float64
     57 
     58 func (s StreamChanFloat) MarshalStream(enc *StreamEncoder) {
     59 	select {
     60 	case <-enc.Done():
     61 		return
     62 	case o := <-s:
     63 		enc.AddFloat(o)
     64 	}
     65 }
     66 
     67 type StreamChanError chan *testObject
     68 
     69 func (s StreamChanError) MarshalStream(enc *StreamEncoder) {
     70 	select {
     71 	case <-enc.Done():
     72 		return
     73 	case <-s:
     74 		enc.AddInterface(struct{}{})
     75 	}
     76 }
     77 
     78 // TestWriter to assert result
     79 type TestWriter struct {
     80 	nWrite *int
     81 	target int
     82 	enc    *StreamEncoder
     83 	result [][]byte
     84 	mux    *sync.RWMutex
     85 }
     86 
     87 func (w *TestWriter) Write(b []byte) (int, error) {
     88 	if len(b) > 0 {
     89 		w.mux.Lock()
     90 		w.result = append(w.result, b)
     91 		if len(w.result) == w.target {
     92 			w.enc.Cancel(nil)
     93 		}
     94 		w.mux.Unlock()
     95 	}
     96 	return len(b), nil
     97 }
     98 
     99 func feedStreamNil(s chan *testObject, target int) {
    100 	for i := 0; i < target; i++ {
    101 		s <- nil
    102 	}
    103 }
    104 
    105 func feedStream(s chan *testObject, target int) {
    106 	for i := 0; i < target; i++ {
    107 		s <- &testObject{}
    108 	}
    109 }
    110 
    111 func feedStreamSlices(s chan *TestEncodingArrStrings, target int) {
    112 	for i := 0; i < target; i++ {
    113 		s <- &TestEncodingArrStrings{"test", "test2"}
    114 	}
    115 }
    116 
    117 func feedStreamStrings(s chan string, target int) {
    118 	for i := 0; i < target; i++ {
    119 		s <- "hello"
    120 	}
    121 }
    122 
    123 func feedStreamInt(s chan int, target int) {
    124 	for i := 0; i < target; i++ {
    125 		s <- i
    126 	}
    127 }
    128 
    129 func feedStreamFloat(s chan float64, target int) {
    130 	for i := 0; i < target; i++ {
    131 		s <- float64(i)
    132 	}
    133 }
    134 
    135 func TestEncodeStream(t *testing.T) {
    136 	t.Run("single-consumer-object", func(t *testing.T) {
    137 		expectedStr :=
    138 			`{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false}
    139 `
    140 		// create our writer
    141 		w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
    142 		enc := Stream.NewEncoder(w).LineDelimited()
    143 		w.enc = enc
    144 		s := StreamChanObject(make(chan *testObject))
    145 		go enc.EncodeStream(s)
    146 		go feedStream(s, 100)
    147 		select {
    148 		case <-enc.Done():
    149 			assert.Nil(t, enc.Err(), "enc.Err() should be nil")
    150 			assert.Len(t, w.result, 100, "w.result should be 100")
    151 			for _, b := range w.result {
    152 				assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
    153 			}
    154 		}
    155 	})
    156 
    157 	t.Run("single-consumer-slice", func(t *testing.T) {
    158 		expectedStr :=
    159 			`["test","test2"]
    160 `
    161 		// create our writer
    162 		w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
    163 		enc := Stream.NewEncoder(w).LineDelimited()
    164 		w.enc = enc
    165 		s := StreamChanSlice(make(chan *TestEncodingArrStrings))
    166 		go enc.EncodeStream(s)
    167 		go feedStreamSlices(s, 100)
    168 		select {
    169 		case <-enc.Done():
    170 			assert.Nil(t, enc.Err(), "enc.Err() should be nil")
    171 			assert.Len(t, w.result, 100, "w.result should be 100")
    172 			for _, b := range w.result {
    173 				assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
    174 			}
    175 		}
    176 	})
    177 
    178 	t.Run("single-consumer-string", func(t *testing.T) {
    179 		expectedStr :=
    180 			`"hello"
    181 `
    182 		// create our writer
    183 		w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
    184 		enc := Stream.NewEncoder(w).LineDelimited()
    185 		w.enc = enc
    186 		s := StreamChanString(make(chan string))
    187 		go enc.EncodeStream(s)
    188 		go feedStreamStrings(s, 100)
    189 		select {
    190 		case <-enc.Done():
    191 			assert.Nil(t, enc.Err(), "enc.Err() should be nil")
    192 			assert.Len(t, w.result, 100, "w.result should be 100")
    193 			for _, b := range w.result {
    194 				assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
    195 			}
    196 		}
    197 	})
    198 
    199 	t.Run("single-consumer-object-nil-value", func(t *testing.T) {
    200 		expectedStr := ``
    201 		// create our writer
    202 		w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
    203 		enc := Stream.NewEncoder(w).LineDelimited()
    204 		w.enc = enc
    205 		s := StreamChanObject(make(chan *testObject))
    206 		go enc.EncodeStream(s)
    207 		go feedStreamNil(s, 100)
    208 		select {
    209 		case <-enc.Done():
    210 			assert.Nil(t, enc.Err(), "enc.Err() should be nil")
    211 			assert.Nil(t, enc.Err(), "enc.Err() should not be nil")
    212 			for _, b := range w.result {
    213 				assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
    214 			}
    215 		}
    216 	})
    217 
    218 	t.Run("single-consumer-int", func(t *testing.T) {
    219 		// create our writer
    220 		w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
    221 		enc := Stream.NewEncoder(w).LineDelimited()
    222 		w.enc = enc
    223 		s := StreamChanInt(make(chan int))
    224 		go enc.EncodeStream(s)
    225 		go feedStreamInt(s, 100)
    226 		select {
    227 		case <-enc.Done():
    228 			assert.Nil(t, enc.Err(), "enc.Err() should be nil")
    229 			assert.Len(t, w.result, 100, "w.result should be 100")
    230 		}
    231 	})
    232 
    233 	t.Run("single-consumer-float", func(t *testing.T) {
    234 		// create our writer
    235 		w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
    236 		enc := Stream.NewEncoder(w).LineDelimited()
    237 		w.enc = enc
    238 		s := StreamChanFloat(make(chan float64))
    239 		go enc.EncodeStream(s)
    240 		go feedStreamFloat(s, 100)
    241 		select {
    242 		case <-enc.Done():
    243 			assert.Nil(t, enc.Err(), "enc.Err() should be nil")
    244 			assert.Len(t, w.result, 100, "w.result should be 100")
    245 		}
    246 	})
    247 
    248 	t.Run("single-consumer-marshal-error", func(t *testing.T) {
    249 		// create our writer
    250 		w := &TestWriter{target: 100, mux: &sync.RWMutex{}}
    251 		enc := Stream.NewEncoder(w).LineDelimited()
    252 		w.enc = enc
    253 		s := StreamChanError(make(chan *testObject))
    254 		go enc.EncodeStream(s)
    255 		go feedStream(s, 100)
    256 		select {
    257 		case <-enc.Done():
    258 			assert.NotNil(t, enc.Err(), "enc.Err() should not be nil")
    259 		}
    260 	})
    261 
    262 	t.Run("single-consumer-write-error", func(t *testing.T) {
    263 		// create our writer
    264 		w := TestWriterError("")
    265 		enc := Stream.NewEncoder(w).LineDelimited()
    266 		s := StreamChanObject(make(chan *testObject))
    267 		go enc.EncodeStream(s)
    268 		go feedStream(s, 100)
    269 		select {
    270 		case <-enc.Done():
    271 			assert.NotNil(t, enc.Err(), "enc.Err() should not be nil")
    272 		}
    273 	})
    274 
    275 	t.Run("multiple-consumer-object-comma-delimited", func(t *testing.T) {
    276 		expectedStr :=
    277 			`{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false},`
    278 		// create our writer
    279 		w := &TestWriter{target: 5000, mux: &sync.RWMutex{}}
    280 		enc := Stream.BorrowEncoder(w).NConsumer(50).CommaDelimited()
    281 		w.enc = enc
    282 		s := StreamChanObject(make(chan *testObject))
    283 		go enc.EncodeStream(s)
    284 		go feedStream(s, 5000)
    285 		select {
    286 		case <-enc.Done():
    287 			assert.Nil(t, enc.Err(), "enc.Err() should be nil")
    288 			assert.Len(t, w.result, 5000, "w.result should be 100")
    289 			for _, b := range w.result {
    290 				assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
    291 			}
    292 		}
    293 	})
    294 
    295 	t.Run("multiple-consumer-object-line-delimited", func(t *testing.T) {
    296 		expectedStr :=
    297 			`{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false}
    298 `
    299 		// create our writer
    300 		w := &TestWriter{target: 5000, mux: &sync.RWMutex{}}
    301 		enc := Stream.NewEncoder(w).NConsumer(50).LineDelimited()
    302 		w.enc = enc
    303 		s := StreamChanObject(make(chan *testObject))
    304 		go feedStream(s, 5000)
    305 		go enc.EncodeStream(s)
    306 		select {
    307 		case <-enc.Done():
    308 			assert.Nil(t, enc.Err(), "enc.Err() should be nil")
    309 			assert.Len(t, w.result, 5000, "w.result should be 100")
    310 			for _, b := range w.result {
    311 				assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string")
    312 			}
    313 		}
    314 	})
    315 
    316 	t.Run("multiple-consumer-object-chan-closed", func(t *testing.T) {
    317 		// create our writer
    318 		w := &TestWriter{target: 5000, mux: &sync.RWMutex{}}
    319 		enc := Stream.NewEncoder(w).NConsumer(50).LineDelimited()
    320 		w.enc = enc
    321 		s := StreamChanObject(make(chan *testObject))
    322 		close(enc.done)
    323 		go feedStream(s, 5000)
    324 		go enc.EncodeStream(s)
    325 		select {
    326 		case <-enc.Done():
    327 			assert.Nil(t, enc.Err(), "enc.Err() should be nil")
    328 			assert.Len(t, w.result, 0, "w.result should be 0")
    329 		}
    330 	})
    331 
    332 	t.Run("encoder-deadline", func(t *testing.T) {
    333 		enc := Stream.NewEncoder(os.Stdout)
    334 		now := time.Now()
    335 		enc.SetDeadline(now)
    336 		d, _ := enc.Deadline()
    337 		assert.Equal(t, now, d, "deadline should be the one just set")
    338 	})
    339 
    340 	t.Run("encoder-deadline-unset", func(t *testing.T) {
    341 		enc := Stream.NewEncoder(os.Stdout)
    342 		d, _ := enc.Deadline()
    343 		assert.Equal(t, time.Time{}, d, "deadline should be the one just set")
    344 	})
    345 
    346 	// just for coverage
    347 	t.Run("encoder-context-value", func(t *testing.T) {
    348 		enc := Stream.NewEncoder(os.Stdout)
    349 		assert.Nil(t, enc.Value(""), "enc.Value should be nil")
    350 	})
    351 }