From da0fc9b42a64f894787ae08e981bbd180611892a Mon Sep 17 00:00:00 2001 From: David Thorpe Date: Wed, 31 Jul 2024 08:19:52 +0200 Subject: [PATCH] Added text stream implementation --- pkg/handler/auth/endpoints.go | 2 +- pkg/handler/certmanager/endpoints.go | 4 +- pkg/handler/nginx/endpoints.go | 4 +- pkg/httpresponse/httpresponse.go | 9 +- pkg/httpresponse/textstream.go | 166 ++++++++++++++++++++ pkg/httpresponse/textstream_test.go | 78 +++++++++ pkg/httpserver/httpresponse/httpresponse.go | 75 --------- 7 files changed, 254 insertions(+), 84 deletions(-) create mode 100644 pkg/httpresponse/textstream.go create mode 100644 pkg/httpresponse/textstream_test.go delete mode 100644 pkg/httpserver/httpresponse/httpresponse.go diff --git a/pkg/handler/auth/endpoints.go b/pkg/handler/auth/endpoints.go index 4946dc7..5ee6c41 100644 --- a/pkg/handler/auth/endpoints.go +++ b/pkg/handler/auth/endpoints.go @@ -105,7 +105,7 @@ func (service *auth) CreateToken(w http.ResponseWriter, r *http.Request) { var req TokenCreate // Get the request - if err := httprequest.Read(r, &req); err != nil { + if err := httprequest.Body(&req, r); err != nil { httpresponse.Error(w, http.StatusBadRequest, err.Error()) return } diff --git a/pkg/handler/certmanager/endpoints.go b/pkg/handler/certmanager/endpoints.go index 65dd59f..9c29eea 100644 --- a/pkg/handler/certmanager/endpoints.go +++ b/pkg/handler/certmanager/endpoints.go @@ -193,7 +193,7 @@ func (service *certmanager) reqCreateCA(w http.ResponseWriter, r *http.Request) var req reqCreateCA // Get the request - if err := httprequest.Read(r, &req); err != nil { + if err := httprequest.Body(&req, r); err != nil { httpresponse.Error(w, http.StatusBadRequest, err.Error()) return } @@ -211,7 +211,7 @@ func (service *certmanager) reqCreateCert(w http.ResponseWriter, r *http.Request var req reqCreateCert // Get the request - if err := httprequest.Read(r, &req); err != nil { + if err := httprequest.Body(&req, r); err != nil { httpresponse.Error(w, http.StatusBadRequest, err.Error()) return } diff --git a/pkg/handler/nginx/endpoints.go b/pkg/handler/nginx/endpoints.go index 61bb143..8cb35d3 100644 --- a/pkg/handler/nginx/endpoints.go +++ b/pkg/handler/nginx/endpoints.go @@ -171,7 +171,7 @@ func (service *nginx) ReadConfig(w http.ResponseWriter, r *http.Request) { // Create a new configuration func (service *nginx) CreateConfig(w http.ResponseWriter, r *http.Request) { var create responseTemplate - if err := httprequest.Read(r, &create); err != nil { + if err := httprequest.Body(&create, r); err != nil { httpresponse.Error(w, http.StatusBadRequest, err.Error()) return } else if create.Name == "" { @@ -264,7 +264,7 @@ func (service *nginx) DeleteConfig(tmpl *folders.Template, w http.ResponseWriter func (service *nginx) PatchConfig(tmpl *folders.Template, w http.ResponseWriter, r *http.Request) { var patch responseTemplate var modified bool - if err := httprequest.Read(r, &patch); err != nil { + if err := httprequest.Body(&patch, r); err != nil { httpresponse.Error(w, http.StatusBadRequest, err.Error()) return } diff --git a/pkg/httpresponse/httpresponse.go b/pkg/httpresponse/httpresponse.go index 86a68f5..5e67306 100644 --- a/pkg/httpresponse/httpresponse.go +++ b/pkg/httpresponse/httpresponse.go @@ -24,10 +24,11 @@ type ErrorResponse struct { // GLOBALS const ( - ContentTypeKey = "Content-Type" - ContentLengthKey = "Content-Length" - ContentTypeJSON = "application/json" - ContentTypeText = "text/plain" + ContentTypeKey = "Content-Type" + ContentLengthKey = "Content-Length" + ContentTypeJSON = "application/json" + ContentTypeText = "text/plain" + ContentTypeTextStream = "text/event-stream" ) /////////////////////////////////////////////////////////////////////////////// diff --git a/pkg/httpresponse/textstream.go b/pkg/httpresponse/textstream.go new file mode 100644 index 0000000..98716e9 --- /dev/null +++ b/pkg/httpresponse/textstream.go @@ -0,0 +1,166 @@ +package httpresponse + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "net/http" + "sync" + "time" +) + +/////////////////////////////////////////////////////////////////////////////// +// TYPES + +// TextStream implements a stream of text events +type TextStream struct { + wg sync.WaitGroup + w io.Writer + ch chan *textevent + err error +} + +type textevent struct { + name string + data []any +} + +/////////////////////////////////////////////////////////////////////////////// +// GLOBALS + +const ( + defaultKeepAlive = 10 * time.Second +) + +var ( + strPing = "ping" + strEvent = []byte("event: ") + strData = []byte("data: ") + strNewline = []byte("\n") +) + +/////////////////////////////////////////////////////////////////////////////// +// LIFECYCLE + +// Create a new text stream with mimetype text/event-stream +// Additional header tuples can be provided as a series of key-value pairs +func NewTextStream(w http.ResponseWriter, tuples ...string) *TextStream { + // Check parameters + if w == nil { + return nil + } + if len(tuples)%2 != 0 { + return nil + } + + // Create a text stream + self := new(TextStream) + self.w = w + self.ch = make(chan *textevent) + + // Set the default content type + w.Header().Set(ContentTypeKey, ContentTypeTextStream) + + // Set additional headers + for i := 0; i < len(tuples); i += 2 { + w.Header().Set(tuples[i], tuples[i+1]) + } + + // Write the response, don't know is this is the right one + w.WriteHeader(http.StatusContinue) + + // goroutine will write to the response writer until the channel is closed + self.wg.Add(1) + go func() { + defer self.wg.Done() + + // Create a ticker for ping messages + ticker := time.NewTimer(100 * time.Millisecond) + defer ticker.Stop() + + // Run until the channel is closed + for { + select { + case evt := <-self.ch: + if evt == nil { + return + } + self.emit(evt) + case <-ticker.C: + self.err = errors.Join(self.err, self.emit(&textevent{strPing, nil})) + ticker.Reset(defaultKeepAlive) + } + } + }() + + // Return the textstream object + return self +} + +// Close the text stream to stop sending ping messages +func (s *TextStream) Close() error { + // Close the channel + close(s.ch) + + // Wait for the goroutine to finish + s.wg.Wait() + + // Return any errors + return s.err +} + +/////////////////////////////////////////////////////////////////////////////// +// PUBLIC METHODS + +// Write a text event to the stream, and one or more optional data objects +// which are encoded as JSON +func (s *TextStream) Write(name string, data ...any) { + s.ch <- &textevent{name, data} +} + +/////////////////////////////////////////////////////////////////////////////// +// PRIVATE METHODS + +// emit an event to the stream +func (s *TextStream) emit(e *textevent) error { + var result error + + // Write the event to the stream + if e.name != "" { + if err := s.write(strEvent, []byte(e.name), strNewline); err != nil { + return err + } + } + + // Write the data to the stream + for _, v := range e.data { + if v == nil { + continue + } else if data, err := json.Marshal(v); err != nil { + result = errors.Join(result, err) + } else if err := s.write(strData, data, strNewline); err != nil { + result = errors.Join(result, err) + } + } + + // Flush the event + if result == nil { + if err := s.write(strNewline); err != nil { + result = errors.Join(result, err) + } + if w, ok := s.w.(http.Flusher); ok { + w.Flush() + } + } + + // Return any errors + return result +} + +func (s *TextStream) write(v ...[]byte) error { + if _, err := s.w.Write(bytes.Join(v, nil)); err != nil { + return err + } + return nil +} diff --git a/pkg/httpresponse/textstream_test.go b/pkg/httpresponse/textstream_test.go new file mode 100644 index 0000000..8ca018d --- /dev/null +++ b/pkg/httpresponse/textstream_test.go @@ -0,0 +1,78 @@ +package httpresponse_test + +import ( + "net/http/httptest" + "testing" + "time" + + // Packages + "github.com/mutablelogic/go-server/pkg/httpresponse" + "github.com/stretchr/testify/assert" +) + +func Test_textstream_001(t *testing.T) { + assert := assert.New(t) + + t.Run("New", func(t *testing.T) { + resp := httptest.NewRecorder() + ts := httpresponse.NewTextStream(resp) + assert.NotNil(ts) + t.Log(ts) + assert.NoError(ts.Close()) + }) + + t.Run("Ping", func(t *testing.T) { + resp := httptest.NewRecorder() + ts := httpresponse.NewTextStream(resp) + assert.NotNil(ts) + + time.Sleep(1 * time.Second) + assert.NoError(ts.Close()) + assert.Equal(100, resp.Code) + assert.Equal("text/event-stream", resp.Header().Get("Content-Type")) + assert.Equal("event: ping\n\n", resp.Body.String()) + }) + + t.Run("EventNoData", func(t *testing.T) { + resp := httptest.NewRecorder() + ts := httpresponse.NewTextStream(resp) + assert.NotNil(ts) + + ts.Write("foo") + + time.Sleep(1 * time.Second) + assert.NoError(ts.Close()) + assert.Equal(100, resp.Code) + assert.Equal("text/event-stream", resp.Header().Get("Content-Type")) + assert.Equal("event: foo\n\n"+"event: ping\n\n", resp.Body.String()) + }) + + t.Run("EventData", func(t *testing.T) { + resp := httptest.NewRecorder() + ts := httpresponse.NewTextStream(resp) + assert.NotNil(ts) + + ts.Write("foo", "bar") + + time.Sleep(1 * time.Second) + assert.NoError(ts.Close()) + assert.Equal(100, resp.Code) + assert.Equal("text/event-stream", resp.Header().Get("Content-Type")) + assert.Equal("event: foo\n"+"data: \"bar\"\n\n"+"event: ping\n\n", resp.Body.String()) + }) + + t.Run("EventDataData", func(t *testing.T) { + resp := httptest.NewRecorder() + ts := httpresponse.NewTextStream(resp) + assert.NotNil(ts) + + ts.Write("foo", "bar1", "bar2") + + time.Sleep(1 * time.Second) + assert.NoError(ts.Close()) + assert.Equal(100, resp.Code) + assert.Equal("text/event-stream", resp.Header().Get("Content-Type")) + assert.Equal("event: foo\n"+"data: \"bar1\"\n"+"data: \"bar2\"\n\n"+"event: ping\n\n", resp.Body.String()) + }) + +} diff --git a/pkg/httpserver/httpresponse/httpresponse.go b/pkg/httpserver/httpresponse/httpresponse.go deleted file mode 100644 index 63aa003..0000000 --- a/pkg/httpserver/httpresponse/httpresponse.go +++ /dev/null @@ -1,75 +0,0 @@ -package httpresponse - -import ( - "encoding/json" - "net/http" - "strings" -) - -/////////////////////////////////////////////////////////////////////////////// -// TYPES - -// ErrorResponse is a generic error response which is served as JSON using the -// ServeError method -type ErrorResponse struct { - Code int `json:"code"` - Reason string `json:"reason,omitempty"` -} - -/////////////////////////////////////////////////////////////////////////////// -// GLOBALS - -const ( - ContentTypeKey = "Content-Type" - ContentLengthKey = "Content-Length" - ContentTypeJSON = "application/json" - ContentTypeText = "text/plain" -) - -/////////////////////////////////////////////////////////////////////////////// -// PUBLIC METHODS - -// JSON is a utility function to serve an arbitary object as JSON -func JSON(w http.ResponseWriter, v interface{}, code int, indent uint) error { - if w == nil { - return nil - } - w.Header().Set(ContentTypeKey, ContentTypeJSON) - w.WriteHeader(code) - enc := json.NewEncoder(w) - if indent > 0 { - enc.SetIndent("", strings.Repeat(" ", int(indent))) - } - return enc.Encode(v) -} - -// Text is a utility function to serve plaintext -func Text(w http.ResponseWriter, v string, code int) { - if w == nil { - return - } - w.Header().Set(ContentTypeKey, ContentTypeText) - w.WriteHeader(code) - w.Write([]byte(v + "\n")) -} - -// Empty is a utility function to serve an empty response -func Empty(w http.ResponseWriter, code int) { - if w == nil { - return - } - w.Header().Set(ContentLengthKey, "0") - w.WriteHeader(code) -} - -// Error is a utility function to serve a JSON error notice -func Error(w http.ResponseWriter, code int, reason ...string) error { - if w == nil { - return nil - } - err := ErrorResponse{code, strings.Join(reason, " ")} - if len(reason) == 0 { - err.Reason = http.StatusText(int(code)) - } - return JSON(w, err, code, 0) -}