encode_stream_test.go (9256B)
1 package gojay 2 3 import ( 4 "os" 5 "sync" 6 "testing" 7 "time" 8 9 "github.com/stretchr/testify/assert" 10 ) 11 12 type StreamChanObject chan *testObject 13 14 func (s StreamChanObject) MarshalStream(enc *StreamEncoder) { 15 select { 16 case <-enc.Done(): 17 return 18 case o := <-s: 19 enc.AddObject(o) 20 } 21 } 22 23 type StreamChanSlice chan *TestEncodingArrStrings 24 25 func (s StreamChanSlice) MarshalStream(enc *StreamEncoder) { 26 select { 27 case <-enc.Done(): 28 return 29 case o := <-s: 30 enc.AddArray(o) 31 } 32 } 33 34 type StreamChanString chan string 35 36 func (s StreamChanString) MarshalStream(enc *StreamEncoder) { 37 select { 38 case <-enc.Done(): 39 return 40 case o := <-s: 41 enc.AddString(o) 42 } 43 } 44 45 type StreamChanInt chan int 46 47 func (s StreamChanInt) MarshalStream(enc *StreamEncoder) { 48 select { 49 case <-enc.Done(): 50 return 51 case o := <-s: 52 enc.AddInt(o) 53 } 54 } 55 56 type StreamChanFloat chan float64 57 58 func (s StreamChanFloat) MarshalStream(enc *StreamEncoder) { 59 select { 60 case <-enc.Done(): 61 return 62 case o := <-s: 63 enc.AddFloat(o) 64 } 65 } 66 67 type StreamChanError chan *testObject 68 69 func (s StreamChanError) MarshalStream(enc *StreamEncoder) { 70 select { 71 case <-enc.Done(): 72 return 73 case <-s: 74 enc.AddInterface(struct{}{}) 75 } 76 } 77 78 // TestWriter to assert result 79 type TestWriter struct { 80 nWrite *int 81 target int 82 enc *StreamEncoder 83 result [][]byte 84 mux *sync.RWMutex 85 } 86 87 func (w *TestWriter) Write(b []byte) (int, error) { 88 if len(b) > 0 { 89 w.mux.Lock() 90 w.result = append(w.result, b) 91 if len(w.result) == w.target { 92 w.enc.Cancel(nil) 93 } 94 w.mux.Unlock() 95 } 96 return len(b), nil 97 } 98 99 func feedStreamNil(s chan *testObject, target int) { 100 for i := 0; i < target; i++ { 101 s <- nil 102 } 103 } 104 105 func feedStream(s chan *testObject, target int) { 106 for i := 0; i < target; i++ { 107 s <- &testObject{} 108 } 109 } 110 111 func feedStreamSlices(s chan *TestEncodingArrStrings, target int) { 112 for i := 0; i < target; i++ { 113 s <- &TestEncodingArrStrings{"test", "test2"} 114 } 115 } 116 117 func feedStreamStrings(s chan string, target int) { 118 for i := 0; i < target; i++ { 119 s <- "hello" 120 } 121 } 122 123 func feedStreamInt(s chan int, target int) { 124 for i := 0; i < target; i++ { 125 s <- i 126 } 127 } 128 129 func feedStreamFloat(s chan float64, target int) { 130 for i := 0; i < target; i++ { 131 s <- float64(i) 132 } 133 } 134 135 func TestEncodeStream(t *testing.T) { 136 t.Run("single-consumer-object", func(t *testing.T) { 137 expectedStr := 138 `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false} 139 ` 140 // create our writer 141 w := &TestWriter{target: 100, mux: &sync.RWMutex{}} 142 enc := Stream.NewEncoder(w).LineDelimited() 143 w.enc = enc 144 s := StreamChanObject(make(chan *testObject)) 145 go enc.EncodeStream(s) 146 go feedStream(s, 100) 147 select { 148 case <-enc.Done(): 149 assert.Nil(t, enc.Err(), "enc.Err() should be nil") 150 assert.Len(t, w.result, 100, "w.result should be 100") 151 for _, b := range w.result { 152 assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string") 153 } 154 } 155 }) 156 157 t.Run("single-consumer-slice", func(t *testing.T) { 158 expectedStr := 159 `["test","test2"] 160 ` 161 // create our writer 162 w := &TestWriter{target: 100, mux: &sync.RWMutex{}} 163 enc := Stream.NewEncoder(w).LineDelimited() 164 w.enc = enc 165 s := StreamChanSlice(make(chan *TestEncodingArrStrings)) 166 go enc.EncodeStream(s) 167 go feedStreamSlices(s, 100) 168 select { 169 case <-enc.Done(): 170 assert.Nil(t, enc.Err(), "enc.Err() should be nil") 171 assert.Len(t, w.result, 100, "w.result should be 100") 172 for _, b := range w.result { 173 assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string") 174 } 175 } 176 }) 177 178 t.Run("single-consumer-string", func(t *testing.T) { 179 expectedStr := 180 `"hello" 181 ` 182 // create our writer 183 w := &TestWriter{target: 100, mux: &sync.RWMutex{}} 184 enc := Stream.NewEncoder(w).LineDelimited() 185 w.enc = enc 186 s := StreamChanString(make(chan string)) 187 go enc.EncodeStream(s) 188 go feedStreamStrings(s, 100) 189 select { 190 case <-enc.Done(): 191 assert.Nil(t, enc.Err(), "enc.Err() should be nil") 192 assert.Len(t, w.result, 100, "w.result should be 100") 193 for _, b := range w.result { 194 assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string") 195 } 196 } 197 }) 198 199 t.Run("single-consumer-object-nil-value", func(t *testing.T) { 200 expectedStr := `` 201 // create our writer 202 w := &TestWriter{target: 100, mux: &sync.RWMutex{}} 203 enc := Stream.NewEncoder(w).LineDelimited() 204 w.enc = enc 205 s := StreamChanObject(make(chan *testObject)) 206 go enc.EncodeStream(s) 207 go feedStreamNil(s, 100) 208 select { 209 case <-enc.Done(): 210 assert.Nil(t, enc.Err(), "enc.Err() should be nil") 211 assert.Nil(t, enc.Err(), "enc.Err() should not be nil") 212 for _, b := range w.result { 213 assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string") 214 } 215 } 216 }) 217 218 t.Run("single-consumer-int", func(t *testing.T) { 219 // create our writer 220 w := &TestWriter{target: 100, mux: &sync.RWMutex{}} 221 enc := Stream.NewEncoder(w).LineDelimited() 222 w.enc = enc 223 s := StreamChanInt(make(chan int)) 224 go enc.EncodeStream(s) 225 go feedStreamInt(s, 100) 226 select { 227 case <-enc.Done(): 228 assert.Nil(t, enc.Err(), "enc.Err() should be nil") 229 assert.Len(t, w.result, 100, "w.result should be 100") 230 } 231 }) 232 233 t.Run("single-consumer-float", func(t *testing.T) { 234 // create our writer 235 w := &TestWriter{target: 100, mux: &sync.RWMutex{}} 236 enc := Stream.NewEncoder(w).LineDelimited() 237 w.enc = enc 238 s := StreamChanFloat(make(chan float64)) 239 go enc.EncodeStream(s) 240 go feedStreamFloat(s, 100) 241 select { 242 case <-enc.Done(): 243 assert.Nil(t, enc.Err(), "enc.Err() should be nil") 244 assert.Len(t, w.result, 100, "w.result should be 100") 245 } 246 }) 247 248 t.Run("single-consumer-marshal-error", func(t *testing.T) { 249 // create our writer 250 w := &TestWriter{target: 100, mux: &sync.RWMutex{}} 251 enc := Stream.NewEncoder(w).LineDelimited() 252 w.enc = enc 253 s := StreamChanError(make(chan *testObject)) 254 go enc.EncodeStream(s) 255 go feedStream(s, 100) 256 select { 257 case <-enc.Done(): 258 assert.NotNil(t, enc.Err(), "enc.Err() should not be nil") 259 } 260 }) 261 262 t.Run("single-consumer-write-error", func(t *testing.T) { 263 // create our writer 264 w := TestWriterError("") 265 enc := Stream.NewEncoder(w).LineDelimited() 266 s := StreamChanObject(make(chan *testObject)) 267 go enc.EncodeStream(s) 268 go feedStream(s, 100) 269 select { 270 case <-enc.Done(): 271 assert.NotNil(t, enc.Err(), "enc.Err() should not be nil") 272 } 273 }) 274 275 t.Run("multiple-consumer-object-comma-delimited", func(t *testing.T) { 276 expectedStr := 277 `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false},` 278 // create our writer 279 w := &TestWriter{target: 5000, mux: &sync.RWMutex{}} 280 enc := Stream.BorrowEncoder(w).NConsumer(50).CommaDelimited() 281 w.enc = enc 282 s := StreamChanObject(make(chan *testObject)) 283 go enc.EncodeStream(s) 284 go feedStream(s, 5000) 285 select { 286 case <-enc.Done(): 287 assert.Nil(t, enc.Err(), "enc.Err() should be nil") 288 assert.Len(t, w.result, 5000, "w.result should be 100") 289 for _, b := range w.result { 290 assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string") 291 } 292 } 293 }) 294 295 t.Run("multiple-consumer-object-line-delimited", func(t *testing.T) { 296 expectedStr := 297 `{"testStr":"","testInt":0,"testInt64":0,"testInt32":0,"testInt16":0,"testInt8":0,"testUint64":0,"testUint32":0,"testUint16":0,"testUint8":0,"testFloat64":0,"testFloat32":0,"testBool":false} 298 ` 299 // create our writer 300 w := &TestWriter{target: 5000, mux: &sync.RWMutex{}} 301 enc := Stream.NewEncoder(w).NConsumer(50).LineDelimited() 302 w.enc = enc 303 s := StreamChanObject(make(chan *testObject)) 304 go feedStream(s, 5000) 305 go enc.EncodeStream(s) 306 select { 307 case <-enc.Done(): 308 assert.Nil(t, enc.Err(), "enc.Err() should be nil") 309 assert.Len(t, w.result, 5000, "w.result should be 100") 310 for _, b := range w.result { 311 assert.Equal(t, expectedStr, string(b), "every byte buffer should be equal to expected string") 312 } 313 } 314 }) 315 316 t.Run("multiple-consumer-object-chan-closed", func(t *testing.T) { 317 // create our writer 318 w := &TestWriter{target: 5000, mux: &sync.RWMutex{}} 319 enc := Stream.NewEncoder(w).NConsumer(50).LineDelimited() 320 w.enc = enc 321 s := StreamChanObject(make(chan *testObject)) 322 close(enc.done) 323 go feedStream(s, 5000) 324 go enc.EncodeStream(s) 325 select { 326 case <-enc.Done(): 327 assert.Nil(t, enc.Err(), "enc.Err() should be nil") 328 assert.Len(t, w.result, 0, "w.result should be 0") 329 } 330 }) 331 332 t.Run("encoder-deadline", func(t *testing.T) { 333 enc := Stream.NewEncoder(os.Stdout) 334 now := time.Now() 335 enc.SetDeadline(now) 336 d, _ := enc.Deadline() 337 assert.Equal(t, now, d, "deadline should be the one just set") 338 }) 339 340 t.Run("encoder-deadline-unset", func(t *testing.T) { 341 enc := Stream.NewEncoder(os.Stdout) 342 d, _ := enc.Deadline() 343 assert.Equal(t, time.Time{}, d, "deadline should be the one just set") 344 }) 345 346 // just for coverage 347 t.Run("encoder-context-value", func(t *testing.T) { 348 enc := Stream.NewEncoder(os.Stdout) 349 assert.Nil(t, enc.Value(""), "enc.Value should be nil") 350 }) 351 }