future.go (2010B)
1 package futures 2 3 import ( 4 "errors" 5 "sync" 6 "sync/atomic" 7 ) 8 9 var ( 10 ErrNotDone = errors.New("not done yet") 11 12 closedChan = make(chan struct{}) 13 ) 14 15 func init() { 16 close(closedChan) 17 } 18 19 type Future[T any] struct { 20 mu sync.Mutex 21 done atomic.Value 22 23 isDone bool 24 value T 25 error error 26 } 27 28 func Run[T any](handler func() (T, error)) *Future[T] { 29 f := &Future[T]{} 30 31 go func() { 32 value, err := handler() 33 34 f.mu.Lock() 35 if ch, ok := f.done.Load().(chan struct{}); ok { 36 close(ch) 37 } 38 f.isDone = true 39 if err != nil { 40 f.error = err 41 } else { 42 f.value = value 43 } 44 f.mu.Unlock() 45 }() 46 47 return f 48 } 49 50 func Resolve[T any](value T) *Future[T] { 51 return &Future[T]{ 52 value: value, 53 isDone: true, 54 } 55 } 56 57 func Reject[T any](err error) *Future[T] { 58 return &Future[T]{ 59 error: err, 60 isDone: true, 61 } 62 } 63 64 func (f *Future[T]) Done() <-chan struct{} { 65 f.mu.Lock() 66 defer f.mu.Unlock() 67 68 return f.unsafeDone() 69 } 70 71 func Done[T any](f *Future[T]) <-chan struct{} { 72 return f.Done() 73 } 74 75 func (f *Future[T]) Await() (value T, err error) { 76 f.mu.Lock() 77 done := f.unsafeDone() 78 f.mu.Unlock() 79 80 // Wait until the task is done 81 _, _ = <-done 82 83 return f.value, f.error 84 } 85 86 func Await[T any](f *Future[T]) (T, error) { 87 return f.Await() 88 } 89 90 func (f *Future[T]) Unwrap() (value T, err error) { 91 if !f.isDone { 92 err = ErrNotDone 93 return 94 } 95 96 return f.value, f.error 97 } 98 99 func Unwrap[T any](f *Future[T]) (T, error) { 100 return f.Unwrap() 101 } 102 103 func Map[T, R any](f *Future[T], action func(T) (R, error)) *Future[R] { 104 return Run[R](func() (result R, err error) { 105 var value T 106 value, err = f.Await() 107 if err != nil { 108 return 109 } 110 return action(value) 111 }) 112 } 113 114 func FlatMap[T, R any](f *Future[T], action func(T) *Future[R]) *Future[R] { 115 return Map[T, R](f, func(value T) (R, error) { 116 return action(value).Await() 117 }) 118 } 119 120 func (f *Future[T]) unsafeDone() <-chan struct{} { 121 if f.isDone { 122 return closedChan 123 } 124 125 d := f.done.Load() 126 if d == nil { 127 d = make(chan struct{}) 128 f.done.Store(d) 129 } 130 131 return d.(chan struct{}) 132 }