functions

The Fool guy's FAAS
git clone git://git.lair.cx/functions
Log | Files | Refs | README

commit f386b02c74521a0f7f152696f7c1fd61562b093e
Author: Yongbin Kim <iam@yongbin.kim>
Date:   Sun,  3 Sep 2023 04:39:51 +0900

First Commit

Signed-off-by: Yongbin Kim <iam@yongbin.kim>

Diffstat:
A.gitignore | 23+++++++++++++++++++++++
AMakefile | 10++++++++++
Acmd/functions/log_debug.go | 18++++++++++++++++++
Acmd/functions/log_prod.go | 16++++++++++++++++
Acmd/functions/main.go | 317+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aconfigs/configs.go | 12++++++++++++
Aexamples/helloworld/main.go | 34++++++++++++++++++++++++++++++++++
Ago.mod | 8++++++++
Ago.sum | 4++++
Ainternal/functions/stdin.go | 67+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Apkg/functions/helper.go | 44++++++++++++++++++++++++++++++++++++++++++++
Apkg/functions/request.go | 13+++++++++++++
Apkg/functions/response.go | 97+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
13 files changed, 663 insertions(+), 0 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -0,0 +1,23 @@ +# Allowlisting gitignore template for GO projects prevents us +# from adding various unwanted local files, such as generated +# files, developer configurations or IDE-specific files etc. +# +# Recommended: Go.AllowList.gitignore + +# Ignore everything +* + +# But not these files... +!/.gitignore + +!*.go +!go.sum +!go.mod + +!README.md +!LICENSE + +!Makefile + +# ...even if they are in subdirectories +!*/ diff --git a/Makefile b/Makefile @@ -0,0 +1,9 @@ +FUNCTIONS_DIR=./.functions/examples +EXAMPLES_DIR=./examples + +$(shell mkdir -p $(FUNCTIONS_DIR)) + +.PHONY: examples +examples: + find $(EXAMPLES_DIR)/* -maxdepth 0 -type d -exec basename {} \; | \ + xargs -I {} go build -tags example -o $(FUNCTIONS_DIR)/{} $(EXAMPLES_DIR)/{} +\ No newline at end of file diff --git a/cmd/functions/log_debug.go b/cmd/functions/log_debug.go @@ -0,0 +1,18 @@ +//go:build !prod + +package main + +import ( + "log/slog" + "os" +) + +func init() { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + + slog.SetDefault(logger) + + slog.Warn("Debug logging enabled") +} diff --git a/cmd/functions/log_prod.go b/cmd/functions/log_prod.go @@ -0,0 +1,16 @@ +//go:build prod + +package main + +import ( + "log/slog" + "os" +) + +func init() { + logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + slog.SetDefault(logger) +} diff --git a/cmd/functions/main.go b/cmd/functions/main.go @@ -0,0 +1,317 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "errors" + "go.lair.cx/functions/configs" + "go.lair.cx/functions/internal/functions" + "golang.org/x/sys/unix" + "io" + "log/slog" + "net/http" + "os" + "os/exec" + "os/signal" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" +) + +var ( + regexResponseStatusLine = regexp.MustCompile(`^HTTP/1\.[01] ([0-9]{3})`) +) + +func main() { + server := &http.Server{ + Addr: configs.ServerAddr, + ReadTimeout: configs.ReadTimeout, + WriteTimeout: configs.WriteTimeout, + Handler: http.HandlerFunc(handler), + } + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + + go func() { + sig := <-sigChan + if sig == nil { + return + } + + slog.Info("Shutting down server...") + + ctx, cancel := context.WithTimeout(context.Background(), configs.ShutdownTimeout) + defer cancel() + + err := server.Shutdown(ctx) + if err != nil { + slog.Error("Server shutdown failed", "error", err) + os.Exit(1) + } + }() + + slog.Info( + "Starting server...", + "addr", configs.ServerAddr, + ) + err := server.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("Server closed with error", "error", err) + os.Exit(1) + } + + select { + case sigChan <- nil: + default: + } +} + +func handler(w http.ResponseWriter, r *http.Request) { + namespace, function, ok := readURL(r.Host, r.URL.Path) + if !ok { + slog.Debug( + "Invalid request", + "host", r.Host, + "method", r.Method, + "url", r.URL.String(), + ) + http.NotFound(w, r) + return + } + + slog.Debug( + "Function request", + "namespace", namespace, + "function", function, + ) + + // Find the executable file. + file, ok := findExec(namespace, function) + if !ok { + slog.Debug( + "Function not found", + "namespace", namespace, + "function", function, + ) + http.NotFound(w, r) + return + } + + slog.Debug( + "Function found", + "namespace", namespace, + "function", function, + "file", file, + ) + + // Create a reader for the request. + requestReader := functions.NewHttpRequestReader(r) + defer func(requestReader *functions.HttpRequestReader) { + _ = requestReader.Close() + }(requestReader) + + // Create a wait group to wait for stdout and stderr to finish. + wg := sync.WaitGroup{} + wg.Add(2) + + // Create the command. + cmd := exec.Command(file) + cmd.Stdin = requestReader + + // Handle stdout and stderr + stdout, err := cmd.StdoutPipe() + if err != nil { + slog.Error("Failed to create stdout pipe", "error", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + go handleStdout(w, stdout, &wg, namespace, function) + + stderr, err := cmd.StderrPipe() + if err != nil { + slog.Error("Failed to create stderr pipe", "error", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + go handleStderr(stderr, &wg, namespace, function) + + // Start the function and wait for it to finish. + slog.Info( + "Starting function", + "namespace", namespace, + "function", function, + ) + err = cmd.Run() + if err != nil { + slog.Error("Failed to start function", "error", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + wg.Wait() +} + +func handleStdout(w http.ResponseWriter, stdout io.Reader, wg *sync.WaitGroup, namespace, function string) { + defer wg.Done() + + r := bufio.NewReader(stdout) + + // Read the first line, which is the HTTP status line + statusLine, err := r.ReadString('\n') + if err != nil { + slog.Error( + "Failed to read status line", + "namespace", namespace, + "function", function, + "error", err, + ) + return + } + + caps := regexResponseStatusLine.FindStringSubmatch(statusLine) + if len(caps) != 2 { + slog.Error( + "Invalid status line", + "namespace", namespace, + "function", function, + "line", statusLine, + ) + return + } + + statusCode, err := strconv.Atoi(caps[1]) + if err != nil { + slog.Error( + "Invalid status code", + "namespace", namespace, + "function", function, + "code", caps[1], + ) + return + } + + w.WriteHeader(statusCode) + + // Read headers + for { + line, err := r.ReadString('\n') + if err != nil { + slog.Error( + "Failed to read header line", + "namespace", namespace, + "function", function, + "error", err, + ) + return + } + + line = line[:len(line)-2] + if len(line) == 0 { + break + } + + parts := strings.SplitN(line, ": ", 2) + if len(parts) != 2 { + slog.Error( + "Invalid header line", + "namespace", namespace, + "function", function, + "line", line, + ) + return + } + + w.Header().Add(parts[0], parts[1]) + } + + // Copy the rest of the body + _, err = io.Copy(w, r) +} + +func handleStderr(stderr io.Reader, wg *sync.WaitGroup, namespace, function string) { + defer wg.Done() + + var buf bytes.Buffer + _, err := io.Copy(&buf, stderr) + if err != nil { + slog.Error("Failed to read stderr", "error", err) + return + } + + if buf.Len() == 0 { + return + } + + slog.Error( + "stderr from function", + "namespace", namespace, + "function", function, + "stderr", buf.String(), + ) +} + +func stripHostSuffix(host, suffix string) (string, bool) { + // Strip the port number. + if i := strings.IndexByte(host, ':'); i >= 0 { + host = host[:i] + } + + // Check if the host ends with the suffix. + cond := len(host) >= len(suffix)+1 && + host[len(host)-len(suffix):] == suffix && + host[len(host)-len(suffix)-1] == '.' + if !cond { + return "", false + } + + return host[:len(host)-len(suffix)-1], true +} + +func readURL(host, path string) (namespace, function string, ok bool) { + namespace, ok = stripHostSuffix(host, configs.RootDomain) + if !ok { + return "", "", false + } + + if len(path) == 0 || path[0] != '/' { + return "", "", false + } + + function = path[1:] + + if function == "" { + function = "$default" + } + + return namespace, function, true +} + +func isUserExecutable(path string) bool { + err := unix.Access(path, unix.X_OK) + if err != nil { + slog.Debug( + "File is not executable", + "file", path, + "error", err, + ) + return false + } + return true +} + +func findExec(namespace, function string) (string, bool) { + file := filepath.Join(configs.FuncDir, namespace, function) + slog.Debug("Looking for executable", "file", file) + + if !isUserExecutable(file) { + return "", false + } + + return file, true +} diff --git a/configs/configs.go b/configs/configs.go @@ -0,0 +1,12 @@ +package configs + +import "time" + +const ( + RootDomain = "functions.dev" + FuncDir = ".functions" + ServerAddr = ":8080" + ReadTimeout = 5 * time.Second + WriteTimeout = 5 * time.Second + ShutdownTimeout = 5 * time.Second +) diff --git a/examples/helloworld/main.go b/examples/helloworld/main.go @@ -0,0 +1,34 @@ +//go:build example + +package main + +import ( + "go.lair.cx/functions/pkg/functions" + "log" + "net/http" +) + +func main() { + functions.ServeFunc(ServeHTTP) +} + +func ServeHTTP(w http.ResponseWriter, r *http.Request) { + name := r.URL.Query().Get("name") + if name == "" { + name = "world" + } + + if name == "panic" { + panic("panic because name is panic") + } + + buf := make([]byte, 0, len(name)+8) + buf = append(buf, "Hello, "...) + buf = append(buf, name...) + buf = append(buf, "!\n"...) + + _, err := w.Write(buf) + if err != nil { + log.Fatalln(err) + } +} diff --git a/go.mod b/go.mod @@ -0,0 +1,8 @@ +module go.lair.cx/functions + +go 1.21 + +require ( + github.com/valyala/bytebufferpool v1.0.0 // indirect + golang.org/x/sys v0.12.0 // indirect +) diff --git a/go.sum b/go.sum @@ -0,0 +1,4 @@ +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/functions/stdin.go b/internal/functions/stdin.go @@ -0,0 +1,67 @@ +package functions + +import ( + "github.com/valyala/bytebufferpool" + "io" + "net/http" +) + +type HttpRequestReader struct { + buf *bytebufferpool.ByteBuffer + body io.Reader +} + +func NewHttpRequestReader(r *http.Request) *HttpRequestReader { + buf := bytebufferpool.Get() + + buf.B = append(buf.B, r.Method...) + buf.B = append(buf.B, ' ') + buf.B = append(buf.B, r.URL.EscapedPath()...) + if r.URL.RawQuery != "" { + buf.B = append(buf.B, '?') + buf.B = append(buf.B, r.URL.RawQuery...) + } + buf.B = append(buf.B, ' ') + buf.B = append(buf.B, r.Proto...) + buf.B = append(buf.B, '\r', '\n') + + for k, values := range r.Header { + for _, v := range values { + buf.B = append(buf.B, k...) + buf.B = append(buf.B, ": "...) + buf.B = append(buf.B, v...) + buf.B = append(buf.B, '\r', '\n') + } + } + + buf.B = append(buf.B, '\r', '\n') + + return &HttpRequestReader{ + buf: buf, + body: r.Body, + } +} + +func (r *HttpRequestReader) Close() error { + bytebufferpool.Put(r.buf) + return nil +} + +// Read implements the io.Reader interface. +// It reads "raw" HTTP request data, including the request line, headers and body. +func (r *HttpRequestReader) Read(p []byte) (n int, err error) { + cursor := 0 + + if r.buf.Len() > 0 { + n = copy(p, r.buf.B) + r.buf.B = r.buf.B[n:] + cursor += n + } + + if cursor == len(p) { + return cursor, nil + } + + n, err = r.body.Read(p[cursor:]) + return n + cursor, err +} diff --git a/pkg/functions/helper.go b/pkg/functions/helper.go @@ -0,0 +1,44 @@ +package functions + +import ( + "log" + "net/http" + "os" +) + +func Serve(handler http.Handler) { + req, err := ReadRequest() + if err != nil { + log.Fatalln(err) + } + + res := NewResponseWriter() + defer closeResponseWriter(res) + + defer func() { + err := recover() + if err == nil { + return + } + + // Print panic message and stack trace. + _, _ = os.Stderr.WriteString("panic: ") + + // Reset response writer. + resetResponseWriter(res) + + // Send HTTP 500 Internal Server Error response. + res.WriteHeader(http.StatusInternalServerError) + _, _ = res.Write([]byte("Internal Server Error\n")) + + // Flush response. + flushResponseWriter(res) + }() + + handler.ServeHTTP(res, req) + flushResponseWriter(res) +} + +func ServeFunc(handler func(http.ResponseWriter, *http.Request)) { + Serve(http.HandlerFunc(handler)) +} diff --git a/pkg/functions/request.go b/pkg/functions/request.go @@ -0,0 +1,13 @@ +package functions + +import ( + "bufio" + "net/http" + "os" +) + +// ReadRequest reads an HTTP request from stdin. +func ReadRequest() (*http.Request, error) { + r := bufio.NewReader(os.Stdin) + return http.ReadRequest(r) +} diff --git a/pkg/functions/response.go b/pkg/functions/response.go @@ -0,0 +1,97 @@ +package functions + +import ( + "github.com/valyala/bytebufferpool" + "net/http" + "os" + "strconv" +) + +type responseWriter struct { + headers http.Header + isHeaderWritten bool + + buf *bytebufferpool.ByteBuffer +} + +func NewResponseWriter() http.ResponseWriter { + return &responseWriter{ + headers: make(http.Header), + buf: bytebufferpool.Get(), + } +} + +func (r *responseWriter) close() { + bytebufferpool.Put(r.buf) +} + +func closeResponseWriter(w http.ResponseWriter) { + if rw, ok := w.(*responseWriter); ok { + rw.close() + } +} + +func (r *responseWriter) reset() { + r.headers = make(http.Header) + r.isHeaderWritten = false + r.buf.Reset() +} + +func resetResponseWriter(w http.ResponseWriter) { + if rw, ok := w.(*responseWriter); ok { + rw.reset() + } +} + +func (r *responseWriter) flush() { + _, _ = os.Stdout.Write(r.buf.B) +} + +func flushResponseWriter(w http.ResponseWriter) { + if rw, ok := w.(*responseWriter); ok { + rw.flush() + } +} + +func (r *responseWriter) Header() http.Header { + return r.headers +} + +func (r *responseWriter) WriteHeader(statusCode int) { + _, _ = r.writeHeader(statusCode) +} + +func (r *responseWriter) writeHeader(statusCode int) (int, error) { + if r.isHeaderWritten { + return 0, nil + } + r.isHeaderWritten = true + + r.buf.B = append(r.buf.B, "HTTP/1.1 "...) + r.buf.B = append(r.buf.B, strconv.Itoa(statusCode)...) + r.buf.B = append(r.buf.B, '\r', '\n') + + for k, values := range r.headers { + for _, v := range values { + r.buf.B = append(r.buf.B, k...) + r.buf.B = append(r.buf.B, ": "...) + r.buf.B = append(r.buf.B, v...) + r.buf.B = append(r.buf.B, '\r', '\n') + } + } + + r.buf.B = append(r.buf.B, '\r', '\n') + + return len(r.buf.B), nil +} + +func (r *responseWriter) Write(bytes []byte) (int, error) { + if !r.isHeaderWritten { + _, err := r.writeHeader(http.StatusOK) + if err != nil { + return 0, err + } + } + r.buf.B = append(r.buf.B, bytes...) + return len(bytes), nil +}