Streaming


Streaming Result

Goa makes it possible to define unidirectional server-side streaming where an endpoint can receive a payload and stream a sequence of results. The streamed results are instances of the same type.

This document describes a method to stream a sequence of results and what the generator produces for the transport-independent and transport-dependent code.

Design

The StreamingResult DSL can be defined on a method to setup an endpoint that streams a sequence of results. StreamingResult DSL has a similar syntax to the Result DSL. StreamingResult and Result are mutually exclusive: only one of then may be used inside a given Method expression.

var _ = Service("cellar", func() {
    Method("list", func() {
        // StoredBottle is sent to the client through a stream.
        StreamingResult(StoredBottle)
    })
})

The code generator produces the following stream interfaces for the list endpoint in the service package.

// Interface that the server must satisfy.
type ListServerStream interface {
    // Send streams instances of "StoredBottle".
    Send(*StoredBottle) error
    // Close the stream.
    Close() error
}

// Interface that the client must satisfy.
type ListClientStream interface {
    // Recv reads instances of "StoredBottle" from the stream.
    Recv() (*StoredBottle, error)
}
  • Send can be called 0 or more times to stream the result instances to the client. If Send returns an error then any subsequent call to Send will also fail and Close does not need to be called.
  • Close closes the stream. Any subsequent call to Send returns an error.
  • Recv reads the next result instance from the stream. It returns io.EOF if the server closed the stream.

The List method signature in the Service interface accepts the server stream interface as one of the arguments. The generated goa client returns the client stream interface.

The transport-dependent code implements the above mentioned server and client stream interfaces using transport-specific streaming logic.

Here is an example service endpoint implementation that sends a stream of StoredBottle and closes the stream after sending.

// Lists lists the stored bottles.
func (s *cellarSvc) List(ctx context.Context, stream cellarsvc.ListServerStream) (err error) {
    bottles := loadStoredBottles()
    for _, c := range bottles {
        if err := stream.Send(c); err != nil {
            return err
        }
    }
    return stream.Close()
}

Streaming via HTTP

Streaming in HTTP leverages websockets. goa uses gorilla websocket to implement the server and client streaming interfaces.

The goa http package provides a websocket Upgrader and Dialer interface and a websocket connection configurer function type which can be used to customize a websocket connection obtained through the Upgrader and Dialer.

Here is an example that provides a custom websocket connection configuration to the server and client streams.

/* service main.go */

// Default upgrader generated by goa
//upgrader := &websocket.Upgrader{}
// Custom websocket upgrader
upgrader := &websocket.Upgrader {
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

// Websocket connection configurer
connConfigurer := func(conn *websocket.Conn) *websocket.Conn {
    conn.SetPingHandler(...)
    conn.SetPongHandler(...)
    conn.SetCloseHandler(...)
    return conn
}

cellarServer = cellarsvcsvr.New(cellarEndpoints, mux, dec, enc, eh, upgrader, connConfigurer)

/* client main.go */

// Default dialer generated by goa
//dialer = websocket.DefaultDialer
// Custom dialer
dialer = &websocket.Dialer {
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

endpoint, payload, err := cli.ParseEndpoint(
    scheme,
    host,
    doer,
    goahttp.RequestEncoder,
    goahttp.ResponseDecoder,
    debug,
    dialer,
    connConfigurer,
)

Result Type with Multiple Views

If the method returns a result type with multiple views, a SetView method is generated in both the interfaces with the following signature:

SetView(view string)

The application developer must call this method in the service endpoint implementation before sending the data to the stream so that the result type is rendered with the appropriate view. If this method is never invoked the default view is used to render the result type.

Here is an example that uses the view requested to render the stored bottles before it is sent to the stream.

// Lists lists the stored bottles.
func (s *cellarSvc) List(ctx context.Context, p *cellarsvc.ListPayload, stream cellarsvc.ListServerStream) (err error) {
    stream.SetView(p.View)
    bottles := loadStoredBottles()
    for _, c := range bottles {
        if err := stream.Send(c); err != nil {
            return err
        }
    }
    return stream.Close()
}