クライアントサイドストリーミングの実装
GoaのStreamingPayload
DSLを使用してクライアントストリーミングエンドポイントを
設計したら、次のステップはデータのストリーミングを処理するクライアントサイドロジックと、
ストリームを処理するサーバーサイドコードの両方を実装することです。このガイドでは、
Goaでストリーミングエンドポイントの両側を実装する方法を説明します。
クライアントサイドの実装
DSLでクライアントストリーミングメソッドを定義すると、Goaはクライアントが実装する 特定のストリームインターフェースを生成します。これらのインターフェースは、 ストリーミングデータをサーバーに送信することを容易にします。
クライアントストリームインターフェース
以下のような設計を想定します:
var _ = Service("logger", func() {
Method("upload", func() {
StreamingPayload(LogEntry)
HTTP(func() {
GET("/logs/upload")
Response(StatusOK)
})
GRPC(func() {})
})
})
クライアントストリームインターフェースには、データの送信とストリームの終了のための メソッドが含まれます:
// クライアントが満たすべきインターフェース
type UploadClientStream interface {
// "LogEntry"のインスタンスをストリーミング
Send(*LogEntry) error
// ストリームを終了
Close() error
}
主要なメソッド
- Send: 指定された型(
LogEntry
)のインスタンスをサーバーに送信します。 このメソッドは複数回呼び出して、複数のペイロードをストリーミングできます。 - Close: ストリームを終了します。
Close
を呼び出した後、Send
はエラーを返します。
実装例
以下はクライアントサイドストリーミングエンドポイントの実装例です:
func uploadLogEntries(client *logger.Client, logEntries []*LogEntry) error {
stream, err := client.Upload(context.Background())
if err != nil {
return fmt.Errorf("アップロードストリームの開始に失敗: %w", err)
}
for _, logEntry := range logEntries {
if err := stream.Send(logEntry); err != nil {
return fmt.Errorf("ログエントリの送信に失敗: %w", err)
}
}
if err := stream.Close(); err != nil {
return fmt.Errorf("ストリームの終了に失敗: %w", err)
}
return nil
}
エラー処理
適切なエラー処理により、堅牢なストリーミング動作を確保します:
- 送信エラーを処理するために、常に
Send
の戻り値をチェックします - サーバーが切断されたりコンテキストがキャンセルされた場合、
Send
メソッドはエラーを返します - デバッグのために、エラーは適切なコンテキストでラップされていることを確認します
- 必要に応じて、一時的な障害に対するリトライロジックの実装を検討します
サーバーサイドの実装
サーバーサイドの実装には、ストリーミングデータの受信と処理が含まれます。 Goaは、受信ストリームを簡単に処理できるサーバーインターフェースを生成します。
サーバーストリームインターフェース
生成されたサーバーインターフェースには、データの受信とストリームの処理のための メソッドが含まれます:
// サーバーがストリームを受信するために使用するインターフェース
type UploadServerStream interface {
// Recvはストリーム内の次のペイロードを返します
Recv() (*LogEntry, error)
// Closeはストリームを終了します
Close() error
}
サーバー実装例
以下はサーバー側でストリームを処理する方法です:
func (s *loggerSvc) Upload(ctx context.Context, stream logger.UploadServerStream) error {
for {
logEntry, err := stream.Recv()
if err == io.EOF {
// ストリームが終了
return nil
}
if err != nil {
return fmt.Errorf("ログエントリの受信エラー: %w", err)
}
// 受信したログエントリを処理
if err := s.processLogEntry(logEntry); err != nil {
return fmt.Errorf("ログエントリの処理エラー: %w", err)
}
}
}
サーバーの主要な考慮事項
ストリーム処理:
- EOFまたはエラーが発生するまで、データを継続的に受信するループを使用
io.EOF
を通常のストリーム終了条件として処理- 到着したデータをその場で処理
リソース管理:
- 受信データのレート制限の実装を検討
- 大規模なストリームを処理する際のメモリ使用量を監視
- 適切なエラー処理とロギングを実装
エラー処理:
- バリデーション失敗に対して適切なエラーを返す
- コンテキストのキャンセルを適切に処理
- 部分的な成功レスポンスの実装を検討
まとめ
Goaでのクライアントサイドストリーミングの実装には、データのクライアントサイド 送信とストリームのサーバーサイド処理の両方が含まれます。これらのパターンと エラー処理およびリソース管理のベストプラクティスに従うことで、APIの効率性を 向上させる堅牢なストリーミングエンドポイントを構築できます。
クライアントの実装は、データの効率的な送信とエラー処理に焦点を当て、サーバーの 実装は、ストリーミングデータの受信と処理のためのクリーンなインターフェースを 提供します。これらが組み合わさって、Goaサービスでアップロードやリアルタイム データ取り込みを処理するための強力なメカニズムを作成します。