decode_stream_test.go (13937B)
1 package gojay 2 3 import ( 4 "context" 5 "errors" 6 "io" 7 "testing" 8 "time" 9 10 "github.com/stretchr/testify/assert" 11 ) 12 13 // Basic Behaviour Tests 14 // 15 func TestDecoderImplementsContext(t *testing.T) { 16 var dec interface{} = &StreamDecoder{} 17 _ = dec.(context.Context) 18 } 19 20 func TestDecodeStreamNoReader(t *testing.T) { 21 dec := Stream.NewDecoder(nil) 22 dec.done = make(chan struct{}, 1) 23 testChan := ChannelStreamObjects(make(chan *TestObj)) 24 go dec.DecodeStream(&testChan) 25 26 select { 27 case <-dec.Done(): 28 assert.NotNil(t, dec.Err(), "dec.Err() should not be nil") 29 assert.Equal(t, "No reader given to decode stream", dec.Err().Error(), "dec.Err().Error() should not be 'No reader given to decode stream'") 30 case <-testChan: 31 assert.True(t, false, "should not be called as decoder should not return error right away") 32 } 33 } 34 35 // Table Tests 36 37 // Objects 38 39 type StreamTestObject struct { 40 name string 41 streamReader *StreamReader 42 expectations func(error, []*TestObj, *testing.T) 43 } 44 45 func TestStreamDecodingObjectsParallel(t *testing.T) { 46 var tests = []StreamTestObject{ 47 { 48 name: "Stream objects", 49 streamReader: &StreamReader{ 50 readChan: make(chan string), 51 done: make(chan struct{}), 52 data: ` 53 {"test":246,"test2":-246,"test3":"string"} 54 {"test":247,"test2":248,"test3":"string"} 55 {"test":777,"test2":456,"test3":"string"} 56 {"test":777,"test2":456,"test3":"string"} 57 {"test":777,"test2":456,"test3":"string"} 58 {"test":777,"test2":456,"test3":"string"} 59 `, 60 }, 61 expectations: func(err error, result []*TestObj, t *testing.T) { 62 assert.Nil(t, err, "err should be nil") 63 64 assert.Equal(t, 246, result[0].test, "v[0].test should be equal to 246") 65 assert.Equal(t, -246, result[0].test2, "v[0].test2 should be equal to -247") 66 assert.Equal(t, "string", result[0].test3, "v[0].test3 should be equal to \"string\"") 67 68 assert.Equal(t, 247, result[1].test, "result[1].test should be equal to 246") 69 assert.Equal(t, 248, result[1].test2, "result[1].test2 should be equal to 248") 70 assert.Equal(t, "string", result[1].test3, "result[1].test3 should be equal to \"string\"") 71 72 assert.Equal(t, 777, result[2].test, "result[2].test should be equal to 777") 73 assert.Equal(t, 456, result[2].test2, "result[2].test2 should be equal to 456") 74 assert.Equal(t, "string", result[2].test3, "result[2].test3 should be equal to \"string\"") 75 76 assert.Equal(t, 777, result[3].test, "result[3].test should be equal to 777") 77 assert.Equal(t, 456, result[3].test2, "result[3].test2 should be equal to 456") 78 assert.Equal(t, "string", result[3].test3, "result[3].test3 should be equal to \"string\"") 79 80 assert.Equal(t, 777, result[4].test, "result[4].test should be equal to 777") 81 assert.Equal(t, 456, result[4].test2, "result[4].test2 should be equal to 456") 82 assert.Equal(t, "string", result[4].test3, "result[4].test3 should be equal to \"string\"") 83 84 assert.Equal(t, 777, result[5].test, "result[5].test should be equal to 777") 85 assert.Equal(t, 456, result[5].test2, "result[5].test2 should be equal to 456") 86 assert.Equal(t, "string", result[5].test3, "result[5].test3 should be equal to \"string\"") 87 }, 88 }, 89 { 90 name: "Stream test objects with null values", 91 streamReader: &StreamReader{ 92 readChan: make(chan string), 93 done: make(chan struct{}), 94 data: ` 95 {"test":246,"test2":-246,"test3":"string"} 96 {"test":247,"test2":248,"test3":"string"} 97 null 98 {"test":777,"test2":456,"test3":"string"} 99 {"test":777,"test2":456,"test3":"string"} 100 {"test":777,"test2":456,"test3":"string"} 101 `, 102 }, 103 expectations: func(err error, result []*TestObj, t *testing.T) { 104 assert.Nil(t, err, "err should be nil") 105 106 assert.Equal(t, 246, result[0].test, "v[0].test should be equal to 246") 107 assert.Equal(t, -246, result[0].test2, "v[0].test2 should be equal to -247") 108 assert.Equal(t, "string", result[0].test3, "v[0].test3 should be equal to \"string\"") 109 110 assert.Equal(t, 247, result[1].test, "result[1].test should be equal to 246") 111 assert.Equal(t, 248, result[1].test2, "result[1].test2 should be equal to 248") 112 assert.Equal(t, "string", result[1].test3, "result[1].test3 should be equal to \"string\"") 113 114 assert.Equal(t, 0, result[2].test, "result[2].test should be equal to 0 as input is null") 115 assert.Equal(t, 0, result[2].test2, "result[2].test2 should be equal to 0 as input is null") 116 assert.Equal(t, "", result[2].test3, "result[2].test3 should be equal to \"\" as input is null") 117 118 assert.Equal(t, 777, result[3].test, "result[3].test should be equal to 777") 119 assert.Equal(t, 456, result[3].test2, "result[3].test2 should be equal to 456") 120 assert.Equal(t, "string", result[3].test3, "result[3].test3 should be equal to \"string\"") 121 122 assert.Equal(t, 777, result[4].test, "result[4].test should be equal to 777") 123 assert.Equal(t, 456, result[4].test2, "result[4].test2 should be equal to 456") 124 assert.Equal(t, "string", result[4].test3, "result[4].test3 should be equal to \"string\"") 125 126 assert.Equal(t, 777, result[5].test, "result[5].test should be equal to 777") 127 assert.Equal(t, 456, result[5].test2, "result[5].test2 should be equal to 456") 128 assert.Equal(t, "string", result[5].test3, "result[5].test3 should be equal to \"string\"") 129 }, 130 }, 131 { 132 name: "Stream test starting with null values", 133 streamReader: &StreamReader{ 134 readChan: make(chan string), 135 done: make(chan struct{}), 136 data: ` 137 null 138 {"test":246,"test2":-246,"test3":"string"} 139 {"test":247,"test2":248,"test3":"string"} 140 `, 141 }, 142 expectations: func(err error, result []*TestObj, t *testing.T) { 143 assert.Nil(t, err, "err should be nil") 144 assert.Equal(t, 0, result[0].test, "result[0].test should be equal to 0 as input is null") 145 assert.Equal(t, 0, result[0].test2, "result[0].test2 should be equal to 0 as input is null") 146 assert.Equal(t, "", result[0].test3, "result[0].test3 should be equal to \"\" as input is null") 147 148 assert.Equal(t, 246, result[1].test, "v[1].test should be equal to 246") 149 assert.Equal(t, -246, result[1].test2, "v[1].test2 should be equal to -247") 150 assert.Equal(t, "string", result[1].test3, "v[1].test3 should be equal to \"string\"") 151 152 assert.Equal(t, 247, result[2].test, "result[2].test should be equal to 246") 153 assert.Equal(t, 248, result[2].test2, "result[2].test2 should be equal to 248") 154 assert.Equal(t, "string", result[2].test3, "result[2].test3 should be equal to \"string\"") 155 }, 156 }, 157 { 158 name: "Stream test invalid JSON", 159 streamReader: &StreamReader{ 160 readChan: make(chan string), 161 done: make(chan struct{}), 162 data: ` 163 invalid json 164 {"test":246,"test2":-246,"test3":"string"} 165 {"test":247,"test2":248,"test3":"string"} 166 `, 167 }, 168 expectations: func(err error, result []*TestObj, t *testing.T) { 169 assert.NotNil(t, err, "err is not nil as JSON is invalid") 170 assert.IsType(t, InvalidJSONError(""), err, "err is of type InvalidJSONError") 171 assert.Equal(t, "Invalid JSON, wrong char 'i' found at position 6", err.Error(), "err message is Invalid JSON") 172 }, 173 }, 174 } 175 for _, testCase := range tests { 176 testCase := testCase 177 t.Run(testCase.name, func(t *testing.T) { 178 t.Parallel() 179 runStreamTestCaseObjects(t, testCase) 180 }) 181 } 182 } 183 184 func runStreamTestCaseObjects(t *testing.T, testCase StreamTestObject) { 185 // create our channel which will receive our objects 186 testChan := ChannelStreamObjects(make(chan *TestObj)) 187 dec := Stream.NewDecoder(testCase.streamReader) 188 // start decoding (will block the goroutine until something is written to the ReadWriter) 189 go dec.DecodeStream(&testChan) 190 // start writing to the ReadWriter 191 go testCase.streamReader.Write() 192 // prepare our result 193 result := []*TestObj{} 194 loop: 195 for { 196 select { 197 case v := <-testChan: 198 result = append(result, v) 199 case <-dec.Done(): 200 break loop 201 } 202 } 203 testCase.expectations(dec.Err(), result, t) 204 } 205 206 type ChannelStreamObjects chan *TestObj 207 208 func (c *ChannelStreamObjects) UnmarshalStream(dec *StreamDecoder) error { 209 obj := &TestObj{} 210 if err := dec.AddObject(obj); err != nil { 211 return err 212 } 213 *c <- obj 214 return nil 215 } 216 217 // Strings 218 type StreamTestString struct { 219 name string 220 streamReader *StreamReader 221 expectations func(error, []*string, *testing.T) 222 } 223 224 func TestStreamDecodingStringsParallel(t *testing.T) { 225 var tests = []StreamTestString{ 226 { 227 name: "Stream strings basic", 228 streamReader: &StreamReader{ 229 readChan: make(chan string), 230 done: make(chan struct{}), 231 data: ` 232 "hello" 233 "world" 234 "!" 235 `, 236 }, 237 expectations: func(err error, result []*string, t *testing.T) { 238 assert.Nil(t, err, "err should be nil") 239 240 assert.Equal(t, "hello", *result[0], "v[0] should be equal to 'hello'") 241 assert.Equal(t, "world", *result[1], "v[1] should be equal to 'world'") 242 assert.Equal(t, "!", *result[2], "v[2] should be equal to '!'") 243 }, 244 }, 245 { 246 name: "Stream strings with null", 247 streamReader: &StreamReader{ 248 readChan: make(chan string), 249 done: make(chan struct{}), 250 data: ` 251 "hello" 252 null 253 "!" 254 `, 255 }, 256 expectations: func(err error, result []*string, t *testing.T) { 257 assert.Nil(t, err, "err should be nil") 258 259 assert.Equal(t, "hello", *result[0], "v[0] should be equal to 'hello'") 260 assert.Equal(t, "", *result[1], "v[1] should be equal to ''") 261 assert.Equal(t, "!", *result[2], "v[2] should be equal to '!'") 262 }, 263 }, 264 { 265 name: "Stream strings invalid JSON", 266 streamReader: &StreamReader{ 267 readChan: make(chan string), 268 done: make(chan struct{}), 269 data: ` 270 "hello" 271 world 272 "!" 273 `, 274 }, 275 expectations: func(err error, result []*string, t *testing.T) { 276 assert.NotNil(t, err, "err should not be nil") 277 278 assert.IsType(t, InvalidJSONError(""), err, "err is of type InvalidJSONError") 279 assert.Equal(t, "Invalid JSON, wrong char 'w' found at position 6", err.Error(), "err message is Invalid JSON") 280 }, 281 }, 282 } 283 for _, testCase := range tests { 284 testCase := testCase 285 t.Run(testCase.name, func(t *testing.T) { 286 t.Parallel() 287 runStreamTestCaseStrings(t, testCase) 288 }) 289 } 290 } 291 292 func runStreamTestCaseStrings(t *testing.T, testCase StreamTestString) { 293 // create our channel which will receive our objects 294 testChan := ChannelStreamStrings(make(chan *string)) 295 dec := Stream.NewDecoder(testCase.streamReader) 296 // start decoding (will block the goroutine until something is written to the ReadWriter) 297 go dec.DecodeStream(testChan) 298 // start writing to the ReadWriter 299 go testCase.streamReader.Write() 300 // prepare our result 301 result := []*string{} 302 loop: 303 for { 304 select { 305 case v := <-testChan: 306 result = append(result, v) 307 case <-dec.Done(): 308 break loop 309 } 310 } 311 testCase.expectations(dec.Err(), result, t) 312 } 313 314 func TestStreamDecodingErr(t *testing.T) { 315 testChan := ChannelStreamStrings(make(chan *string)) 316 dec := Stream.NewDecoder(&StreamReaderErr{}) 317 // start decoding (will block the goroutine until something is written to the ReadWriter) 318 go dec.DecodeStream(testChan) 319 select { 320 case <-dec.Done(): 321 assert.NotNil(t, dec.Err(), "dec.Err() should not be nil") 322 case <-testChan: 323 assert.True(t, false, "should not be called") 324 } 325 326 } 327 328 type ChannelStreamStrings chan *string 329 330 func (c ChannelStreamStrings) UnmarshalStream(dec *StreamDecoder) error { 331 str := "" 332 if err := dec.AddString(&str); err != nil { 333 return err 334 } 335 c <- &str 336 return nil 337 } 338 339 // StreamReader mocks a stream reading chunks of data 340 type StreamReader struct { 341 writeCounter int 342 readChan chan string 343 done chan struct{} 344 data string 345 } 346 347 func (r *StreamReader) Write() { 348 l := len(r.data) 349 t := 4 350 chunkSize := l / t 351 carry := 0 352 lastWrite := 0 353 for r.writeCounter < t { 354 time.Sleep(time.Duration(r.writeCounter*100) * time.Millisecond) 355 currentChunkStart := (chunkSize) * r.writeCounter 356 lastWrite = currentChunkStart + chunkSize 357 r.readChan <- r.data[currentChunkStart:lastWrite] 358 carry = l - lastWrite 359 r.writeCounter++ 360 } 361 if carry > 0 { 362 r.readChan <- r.data[lastWrite:] 363 } 364 r.done <- struct{}{} 365 } 366 367 func (r *StreamReader) Read(b []byte) (int, error) { 368 select { 369 case v := <-r.readChan: 370 n := copy(b, v) 371 return n, nil 372 case <-r.done: 373 return 0, io.EOF 374 } 375 } 376 377 type StreamReaderErr struct{} 378 379 func (r *StreamReaderErr) Read(b []byte) (int, error) { 380 return 0, errors.New("Test Error") 381 } 382 383 // Deadline test 384 func TestStreamDecodingDeadline(t *testing.T) { 385 dec := Stream.NewDecoder(&StreamReader{}) 386 now := time.Now() 387 dec.SetDeadline(now) 388 deadline, _ := dec.Deadline() 389 assert.Equal(t, now.String(), deadline.String(), "dec.now and now should be equal") 390 assert.Equal(t, now.String(), dec.deadline.String(), "dec.now and now should be equal") 391 } 392 393 func TestStreamDecodingDeadlineNotSet(t *testing.T) { 394 dec := Stream.NewDecoder(&StreamReader{}) 395 _, isSet := dec.Deadline() 396 assert.Equal(t, false, isSet, "isSet should be false as deadline is not set") 397 } 398 399 // this test is only relevant for coverage 400 func TestStreamDecodingValue(t *testing.T) { 401 dec := Stream.NewDecoder(&StreamReader{}) 402 v := dec.Value("") 403 assert.Nil(t, v, "v should be nil") 404 } 405 406 func TestStreamDecodingErrNotSet(t *testing.T) { 407 dec := Stream.NewDecoder(&StreamReader{}) 408 assert.Nil(t, dec.Err(), "dec.Err should be nim") 409 } 410 411 func TestStreamDecodingPoolError(t *testing.T) { 412 dec := Stream.BorrowDecoder(nil) 413 dec.Release() 414 defer func() { 415 err := recover() 416 assert.NotNil(t, err, "err shouldnt be nil") 417 assert.IsType(t, InvalidUsagePooledDecoderError(""), err, "err should be of type InvalidUsagePooledEncoderError") 418 assert.Equal(t, "Invalid usage of pooled decoder", err.(InvalidUsagePooledDecoderError).Error(), "err should be of type InvalidUsagePooledDecoderError") 419 }() 420 testChan := ChannelStreamStrings(make(chan *string)) 421 _ = dec.DecodeStream(testChan) 422 assert.True(t, false, "should not be called as it should have panicked") 423 }