はじめに
今回は、本日の天気情報を音声でお知らせするという機能に関して、ユーザーがアプリで設定した天気お知らせ時刻の1時間前に、該当の天気情報の音声ファイルを生成してAWS S3に保存するという機能を作成したい。最終的にはミーアちゃんが喋る。これにより、ユーザーは定期的に更新される天気情報を簡単に入手できるようになる。
go言語でタスクキューを使った並列処理で実装を進める。
タスクキューを使った並列処理の実装手順
タスクキューシステムとは何か?
タスクキューの流れ
- クライアントがタスクをメッセージブローカーに送信し、それがキューに追加される。(この操作は「キューイング」(queueing)と呼ばれる)。キューの中にタスクが複数含まれる。
- そして、ワーカー(サーバー側のプロセス)がこれらのタスクをキューから「デキュー」(dequeue)し、実行していく。
メッセージブローカー
- 異なるシステム間、またはシステム内の様々なコンポーネント間でのメッセージの配信を仲介するソフトウェア。メッセージを一時的に保持するキューを提供し、それらを順番に処理する。
- Go言語でのキュー実装か、外部キューシステム(RabbitMQ, Apache Kafkaなど)を使用するの2択がある。Goの内部処理(チャネル機能を使う)として実装する方が簡単なので、今回はGoの内部処理として実装する。今後拡張する際には外部キューシステムを利用。
AWS S3への音声ファイルアップロードを行うワーカープールの実装:worker.go
Message構造体:
- キューに追加されるメッセージを表す。ユーザーIDとメッセージタイプ(例: 天気)を持つ
ScheduleTask
関数
- 指定された時刻の1時間前に実行されるタスクをスケジュールする。
- 現在時刻を取得し、1時間後の時刻を計算。
- SQLクエリを使用して、設定された時刻に合致するユーザーをデータベースから取得。
- 合致したユーザーがいれば、そのユーザーの情報でメッセージを生成し、チャネルに送信する。
ProcessMessages
関数
- キューからメッセージを取り出し、それぞれのメッセージを処理する。
- 天気情報をテキストで取得し、AWSのセッションを作成して、音声ファイルをS3にアップロードする(
SynthesizeSpeechAndUploadToS3
)。SynthesizeSpeechAndUploadToS3
関数は、google text-to-speechを用いてテキストから音声ファイルを作成し、AWS S3にアップロードする。
https://pkg.go.dev/cloud.google.com/go/texttospeech
- 天気情報は、今回は一旦直書きで。後ほど別回収と合わせて回収する。
Go
package clocky_be
import (
"log"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/jmoiron/sqlx"
)
// Message はキューに追加されるメッセージを表します。
type Message struct {
UserID string // ユーザーID
Type string // メッセージのタイプ
}
// ScheduleTask はスケジュールされたタスクを実行し、メッセージをキューに追加します。
func ScheduleTask(db *sqlx.DB, messagesChan chan Message) {
log.Println("ScheduleTask started")
// 現在の時刻を取得
currentTime := time.Now()
jst, err := time.LoadLocation("Asia/Tokyo")
if err != nil {
log.Fatalf("Failed to load the 'Asia/Tokyo' time zone: %v", err)
}
currentTimeJST := currentTime.In(jst)
log.Printf("Current time in JST: %v", currentTimeJST)
// 現在時刻から1時間後の時刻を計算
oneHourLater := currentTimeJST.Add(time.Hour)
log.Printf("One hour later in JST: %v", oneHourLater)
// SQLクエリを実行して、条件に合致するユーザーを取得
var users []User
query := `SELECT * FROM users WHERE DATE_FORMAT(weather_announcement_time, '%H:%i') = DATE_FORMAT(?, '%H:%i');`
err = db.Select(&users, query, oneHourLater.String())
if err != nil {
log.Printf("Error fetching users: %v", err)
return
}
// 合致したユーザーがいるかどうかを確認
if len(users) > 0 {
// 取得したユーザーからメッセージを生成し、チャネルに送信
for _, user := range users {
log.Printf("Scheduling task for user: %s", user.UID)
message := Message{
UserID: user.UID,
Type: "weather",
}
messagesChan <- message
}
log.Println("ScheduleTask completed")
} else {
// 合致したユーザーがいない場合のログ
log.Println("ScheduleTask skipped: No matching users found")
}
}
// processMessages はキューからメッセージを取り出し、処理します。
func ProcessMessages(messagesChan chan Message) {
for message := range messagesChan {
log.Printf("Processing message for user: %s, type: %s", message.UserID, message.Type)
if message.Type == "weather" {
// 天気情報のテキストを取得する。一旦直書きで
// 天気情報取得のAPIリクエストを廃止する回収の時に、合わせて回収する。
weatherText := "東京の本日の天気は晴れ時々曇り、最高気温は25度です。"
// AWSセッションの作成
config := NewConfig()
awsSession, err := session.NewSession(&aws.Config{
Region: aws.String(config.AWSRegion),
Credentials: credentials.NewStaticCredentials(config.AWSAccessKey, config.AWSSecretKey, ""),
})
if err != nil {
log.Fatalf("Failed to create AWS session: %v", err)
}
// 天気情報を音声ファイルに変換して、AWS S3にアップロードする
if err := SynthesizeSpeechAndUploadToS3(awsSession, config.AWSS3Bucket, weatherText, message.UserID, message.Type); err != nil {
log.Printf("Failed to synthesize and upload speech for user %s: %v", message.UserID, err)
continue
}
}
}
}
クーロンジョブ(定期的なタスクスケジューリング)の設定:main.go
- クーロンジョブはユーザーごとに設定された時刻の1時間前に、そのユーザーのタスクをキューに追加するように設定。
github.com/robfig/cron/v3
パッケージを利用して、スケジューリングを行う。
Zsh
$ go get github.com/robfig/cron/v3@v3.0.0
使い方
- 引数は最大5つまで
- 最初の引数が「秒(任意)」で、その後で、分、時間、日、月、曜日と続く。
cron.Descriptor
は特別な記述子(例:@every 1h30m
)をサポートすることを意味する- 秒単位のサポートを有効にするには、
cron.New(cron.WithSeconds())
を使用してCronスケジューラを初期化する必要がある。
Go
// Seconds field, required
cron.New(cron.WithSeconds())
// Seconds field, optional
cron.New(cron.WithParser(cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)))
Go
package main
import (
"log"
clocky "github.com/EarEEG-dev/clocky_be"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/robfig/cron/v3"
)
func main() {
config := clocky.NewConfig()
db, err := sqlx.Open("mysql", config.DSN)
if err != nil {
log.Fatalf("unable to connect to database: %v", err)
}
if err := db.Ping(); err != nil {
log.Fatalf("unable to ping database: %v", err)
}
log.Printf("connected to database")
// メッセージチャネルとCronジョブの設定
messagesChan := make(chan clocky.Message)
// 秒単位をサポートしてcronを初期化
c := cron.New(cron.WithSeconds())
// 毎分0秒時点で起動
c.AddFunc("0 * * * * *", func() {
log.Println("Cron job triggered")
clocky.ScheduleTask(db, messagesChan)
})
go c.Start()
// 処理関数の起動数を増やして並列に処理させることも可能
go clocky.ProcessMessages(messagesChan)
// gRPCサーバー
grpcServer := clocky.NewGRPCServer(config, db)
go grpcServer.Start(":50051")
// Rest API
server := clocky.NewServer(config, db)
server.Start()
}
検証
アプリ側で下記のように天気お知らせ時刻を設定。
すると、天気お知らせ時刻の1時間前になると(今回だと17:53)、cron jobでトリガーされたタスクが起動し、実際にAWS S3に音声ファイルが保存(bucket name→user id→weatherディレクトリ)された。
また、他の時刻の場合には、毎分Cron jobはトリガーされるが、天気お知らせ時刻の1時間前ではないので、実際のタスクはスキップされ(つまり重複で音声ファイルを生成しない)、そのログも保存された。
コメント