Skip to content

Commit

Permalink
Added text stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
djthorpe committed Jul 31, 2024
1 parent cb51a2f commit da0fc9b
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 84 deletions.
2 changes: 1 addition & 1 deletion pkg/handler/auth/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (service *auth) CreateToken(w http.ResponseWriter, r *http.Request) {
var req TokenCreate

// Get the request
if err := httprequest.Read(r, &req); err != nil {
if err := httprequest.Body(&req, r); err != nil {
httpresponse.Error(w, http.StatusBadRequest, err.Error())
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/certmanager/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (service *certmanager) reqCreateCA(w http.ResponseWriter, r *http.Request)
var req reqCreateCA

// Get the request
if err := httprequest.Read(r, &req); err != nil {
if err := httprequest.Body(&req, r); err != nil {
httpresponse.Error(w, http.StatusBadRequest, err.Error())
return
}
Expand All @@ -211,7 +211,7 @@ func (service *certmanager) reqCreateCert(w http.ResponseWriter, r *http.Request
var req reqCreateCert

// Get the request
if err := httprequest.Read(r, &req); err != nil {
if err := httprequest.Body(&req, r); err != nil {
httpresponse.Error(w, http.StatusBadRequest, err.Error())
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/nginx/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (service *nginx) ReadConfig(w http.ResponseWriter, r *http.Request) {
// Create a new configuration
func (service *nginx) CreateConfig(w http.ResponseWriter, r *http.Request) {
var create responseTemplate
if err := httprequest.Read(r, &create); err != nil {
if err := httprequest.Body(&create, r); err != nil {
httpresponse.Error(w, http.StatusBadRequest, err.Error())
return
} else if create.Name == "" {
Expand Down Expand Up @@ -264,7 +264,7 @@ func (service *nginx) DeleteConfig(tmpl *folders.Template, w http.ResponseWriter
func (service *nginx) PatchConfig(tmpl *folders.Template, w http.ResponseWriter, r *http.Request) {
var patch responseTemplate
var modified bool
if err := httprequest.Read(r, &patch); err != nil {
if err := httprequest.Body(&patch, r); err != nil {
httpresponse.Error(w, http.StatusBadRequest, err.Error())
return
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/httpresponse/httpresponse.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type ErrorResponse struct {
// GLOBALS

const (
ContentTypeKey = "Content-Type"
ContentLengthKey = "Content-Length"
ContentTypeJSON = "application/json"
ContentTypeText = "text/plain"
ContentTypeKey = "Content-Type"
ContentLengthKey = "Content-Length"
ContentTypeJSON = "application/json"
ContentTypeText = "text/plain"
ContentTypeTextStream = "text/event-stream"
)

///////////////////////////////////////////////////////////////////////////////
Expand Down
166 changes: 166 additions & 0 deletions pkg/httpresponse/textstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package httpresponse

import (
"bytes"
"encoding/json"
"errors"
"io"
"net/http"
"sync"
"time"
)

///////////////////////////////////////////////////////////////////////////////
// TYPES

// TextStream implements a stream of text events
type TextStream struct {
wg sync.WaitGroup
w io.Writer
ch chan *textevent
err error
}

type textevent struct {
name string
data []any
}

///////////////////////////////////////////////////////////////////////////////
// GLOBALS

const (
defaultKeepAlive = 10 * time.Second
)

var (
strPing = "ping"
strEvent = []byte("event: ")
strData = []byte("data: ")
strNewline = []byte("\n")
)

///////////////////////////////////////////////////////////////////////////////
// LIFECYCLE

// Create a new text stream with mimetype text/event-stream
// Additional header tuples can be provided as a series of key-value pairs
func NewTextStream(w http.ResponseWriter, tuples ...string) *TextStream {
// Check parameters
if w == nil {
return nil
}
if len(tuples)%2 != 0 {
return nil
}

// Create a text stream
self := new(TextStream)
self.w = w
self.ch = make(chan *textevent)

// Set the default content type
w.Header().Set(ContentTypeKey, ContentTypeTextStream)

// Set additional headers
for i := 0; i < len(tuples); i += 2 {
w.Header().Set(tuples[i], tuples[i+1])
}

// Write the response, don't know is this is the right one
w.WriteHeader(http.StatusContinue)

// goroutine will write to the response writer until the channel is closed
self.wg.Add(1)
go func() {
defer self.wg.Done()

// Create a ticker for ping messages
ticker := time.NewTimer(100 * time.Millisecond)
defer ticker.Stop()

// Run until the channel is closed
for {
select {
case evt := <-self.ch:
if evt == nil {
return
}
self.emit(evt)
case <-ticker.C:
self.err = errors.Join(self.err, self.emit(&textevent{strPing, nil}))
ticker.Reset(defaultKeepAlive)
}
}
}()

// Return the textstream object
return self
}

// Close the text stream to stop sending ping messages
func (s *TextStream) Close() error {
// Close the channel
close(s.ch)

// Wait for the goroutine to finish
s.wg.Wait()

// Return any errors
return s.err
}

///////////////////////////////////////////////////////////////////////////////
// PUBLIC METHODS

// Write a text event to the stream, and one or more optional data objects
// which are encoded as JSON
func (s *TextStream) Write(name string, data ...any) {
s.ch <- &textevent{name, data}
}

///////////////////////////////////////////////////////////////////////////////
// PRIVATE METHODS

// emit an event to the stream
func (s *TextStream) emit(e *textevent) error {
var result error

// Write the event to the stream
if e.name != "" {
if err := s.write(strEvent, []byte(e.name), strNewline); err != nil {
return err
}
}

// Write the data to the stream
for _, v := range e.data {
if v == nil {
continue
} else if data, err := json.Marshal(v); err != nil {
result = errors.Join(result, err)
} else if err := s.write(strData, data, strNewline); err != nil {
result = errors.Join(result, err)
}
}

// Flush the event
if result == nil {
if err := s.write(strNewline); err != nil {
result = errors.Join(result, err)
}
if w, ok := s.w.(http.Flusher); ok {
w.Flush()
}
}

// Return any errors
return result
}

func (s *TextStream) write(v ...[]byte) error {
if _, err := s.w.Write(bytes.Join(v, nil)); err != nil {
return err
}
return nil
}
78 changes: 78 additions & 0 deletions pkg/httpresponse/textstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package httpresponse_test

import (
"net/http/httptest"
"testing"
"time"

// Packages
"github.com/mutablelogic/go-server/pkg/httpresponse"
"github.com/stretchr/testify/assert"
)

func Test_textstream_001(t *testing.T) {
assert := assert.New(t)

t.Run("New", func(t *testing.T) {
resp := httptest.NewRecorder()
ts := httpresponse.NewTextStream(resp)
assert.NotNil(ts)
t.Log(ts)
assert.NoError(ts.Close())
})

t.Run("Ping", func(t *testing.T) {
resp := httptest.NewRecorder()
ts := httpresponse.NewTextStream(resp)
assert.NotNil(ts)

time.Sleep(1 * time.Second)
assert.NoError(ts.Close())
assert.Equal(100, resp.Code)
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
assert.Equal("event: ping\n\n", resp.Body.String())
})

t.Run("EventNoData", func(t *testing.T) {
resp := httptest.NewRecorder()
ts := httpresponse.NewTextStream(resp)
assert.NotNil(ts)

ts.Write("foo")

time.Sleep(1 * time.Second)
assert.NoError(ts.Close())
assert.Equal(100, resp.Code)
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
assert.Equal("event: foo\n\n"+"event: ping\n\n", resp.Body.String())
})

t.Run("EventData", func(t *testing.T) {
resp := httptest.NewRecorder()
ts := httpresponse.NewTextStream(resp)
assert.NotNil(ts)

ts.Write("foo", "bar")

time.Sleep(1 * time.Second)
assert.NoError(ts.Close())
assert.Equal(100, resp.Code)
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
assert.Equal("event: foo\n"+"data: \"bar\"\n\n"+"event: ping\n\n", resp.Body.String())
})

t.Run("EventDataData", func(t *testing.T) {
resp := httptest.NewRecorder()
ts := httpresponse.NewTextStream(resp)
assert.NotNil(ts)

ts.Write("foo", "bar1", "bar2")

time.Sleep(1 * time.Second)
assert.NoError(ts.Close())
assert.Equal(100, resp.Code)
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
assert.Equal("event: foo\n"+"data: \"bar1\"\n"+"data: \"bar2\"\n\n"+"event: ping\n\n", resp.Body.String())
})

}
75 changes: 0 additions & 75 deletions pkg/httpserver/httpresponse/httpresponse.go

This file was deleted.

0 comments on commit da0fc9b

Please sign in to comment.