クライアントサイドストリーミングの実装

GoaのStreamingPayload DSLを使用してクライアントストリーミングエンドポイントを 設計したら、次のステップはデータのストリーミングを処理するクライアントサイドロジックと、 ストリームを処理するサーバーサイドコードの両方を実装することです。このガイドでは、 Goaでストリーミングエンドポイントの両側を実装する方法を説明します。

クライアントサイドの実装

DSLでクライアントストリーミングメソッドを定義すると、Goaはクライアントが実装する 特定のストリームインターフェースを生成します。これらのインターフェースは、 ストリーミングデータをサーバーに送信することを容易にします。

クライアントストリームインターフェース

以下のような設計を想定します:

var _ = Service("logger", func() {
    Method("upload", func() {
        StreamingPayload(LogEntry)
        HTTP(func() {
            GET("/logs/upload")
            Response(StatusOK)
        })
        GRPC(func() {})
    })
})

クライアントストリームインターフェースには、データの送信とストリームの終了のための メソッドが含まれます:

// クライアントが満たすべきインターフェース
type UploadClientStream interface {
    // "LogEntry"のインスタンスをストリーミング
    Send(*LogEntry) error
    // ストリームを終了
    Close() error
}

主要なメソッド

  • Send: 指定された型(LogEntry)のインスタンスをサーバーに送信します。 このメソッドは複数回呼び出して、複数のペイロードをストリーミングできます。
  • Close: ストリームを終了します。Closeを呼び出した後、Sendはエラーを返します。

実装例

以下はクライアントサイドストリーミングエンドポイントの実装例です:

func uploadLogEntries(client *logger.Client, logEntries []*LogEntry) error {
    stream, err := client.Upload(context.Background())
    if err != nil {
        return fmt.Errorf("アップロードストリームの開始に失敗: %w", err)
    }

    for _, logEntry := range logEntries {
        if err := stream.Send(logEntry); err != nil {
            return fmt.Errorf("ログエントリの送信に失敗: %w", err)
        }
    }

    if err := stream.Close(); err != nil {
        return fmt.Errorf("ストリームの終了に失敗: %w", err)
    }

    return nil
}

エラー処理

適切なエラー処理により、堅牢なストリーミング動作を確保します:

  • 送信エラーを処理するために、常にSendの戻り値をチェックします
  • サーバーが切断されたりコンテキストがキャンセルされた場合、Sendメソッドはエラーを返します
  • デバッグのために、エラーは適切なコンテキストでラップされていることを確認します
  • 必要に応じて、一時的な障害に対するリトライロジックの実装を検討します

サーバーサイドの実装

サーバーサイドの実装には、ストリーミングデータの受信と処理が含まれます。 Goaは、受信ストリームを簡単に処理できるサーバーインターフェースを生成します。

サーバーストリームインターフェース

生成されたサーバーインターフェースには、データの受信とストリームの処理のための メソッドが含まれます:

// サーバーがストリームを受信するために使用するインターフェース
type UploadServerStream interface {
    // Recvはストリーム内の次のペイロードを返します
    Recv() (*LogEntry, error)
    // Closeはストリームを終了します
    Close() error
}

サーバー実装例

以下はサーバー側でストリームを処理する方法です:

func (s *loggerSvc) Upload(ctx context.Context, stream logger.UploadServerStream) error {
    for {
        logEntry, err := stream.Recv()
        if err == io.EOF {
            // ストリームが終了
            return nil
        }
        if err != nil {
            return fmt.Errorf("ログエントリの受信エラー: %w", err)
        }

        // 受信したログエントリを処理
        if err := s.processLogEntry(logEntry); err != nil {
            return fmt.Errorf("ログエントリの処理エラー: %w", err)
        }
    }
}

サーバーの主要な考慮事項

  1. ストリーム処理:

    • EOFまたはエラーが発生するまで、データを継続的に受信するループを使用
    • io.EOFを通常のストリーム終了条件として処理
    • 到着したデータをその場で処理
  2. リソース管理:

    • 受信データのレート制限の実装を検討
    • 大規模なストリームを処理する際のメモリ使用量を監視
    • 適切なエラー処理とロギングを実装
  3. エラー処理:

    • バリデーション失敗に対して適切なエラーを返す
    • コンテキストのキャンセルを適切に処理
    • 部分的な成功レスポンスの実装を検討

まとめ

Goaでのクライアントサイドストリーミングの実装には、データのクライアントサイド 送信とストリームのサーバーサイド処理の両方が含まれます。これらのパターンと エラー処理およびリソース管理のベストプラクティスに従うことで、APIの効率性を 向上させる堅牢なストリーミングエンドポイントを構築できます。

クライアントの実装は、データの効率的な送信とエラー処理に焦点を当て、サーバーの 実装は、ストリーミングデータの受信と処理のためのクリーンなインターフェースを 提供します。これらが組み合わさって、Goaサービスでアップロードやリアルタイム データ取り込みを処理するための強力なメカニズムを作成します。