方言を話すおしゃべり猫型ロボット『ミーア』をリリースしました(こちらをクリック)

【Go】タスクキューとクーロンを使った並列処理の実装

この記事は約11分で読めます。

はじめに

今回は、本日の天気情報を音声でお知らせするという機能に関して、ユーザーがアプリで設定した天気お知らせ時刻の1時間前に、該当の天気情報の音声ファイルを生成してAWS S3に保存するという機能を作成したい。最終的にはミーアちゃんが喋る。これにより、ユーザーは定期的に更新される天気情報を簡単に入手できるようになる。

go言語でタスクキューを使った並列処理で実装を進める。

タスクキューを使った並列処理の実装手順

タスクキューシステムとは何か?

タスクキューの流れ

  • クライアントがタスクをメッセージブローカーに送信し、それがキューに追加される。(この操作は「キューイング」(queueing)と呼ばれる)。キューの中にタスクが複数含まれる。
  • そして、ワーカー(サーバー側のプロセス)がこれらのタスクをキューから「デキュー」(dequeue)し、実行していく。

メッセージブローカー

  • 異なるシステム間、またはシステム内の様々なコンポーネント間でのメッセージの配信を仲介するソフトウェア。メッセージを一時的に保持するキューを提供し、それらを順番に処理する。
  • Go言語でのキュー実装か、外部キューシステム(RabbitMQ, Apache Kafkaなど)を使用するの2択がある。Goの内部処理(チャネル機能を使う)として実装する方が簡単なので、今回はGoの内部処理として実装する。今後拡張する際には外部キューシステムを利用。

AWS S3への音声ファイルアップロードを行うワーカープールの実装:worker.go

Message構造体:

  • キューに追加されるメッセージを表す。ユーザーIDとメッセージタイプ(例: 天気)を持つ

ScheduleTask 関数

  • 指定された時刻の1時間前に実行されるタスクをスケジュールする。
  • 現在時刻を取得し、1時間後の時刻を計算。
  • SQLクエリを使用して、設定された時刻に合致するユーザーをデータベースから取得。
  • 合致したユーザーがいれば、そのユーザーの情報でメッセージを生成し、チャネルに送信する。

ProcessMessages 関数

  • キューからメッセージを取り出し、それぞれのメッセージを処理する。
  • 天気情報をテキストで取得し、AWSのセッションを作成して、音声ファイルをS3にアップロードする(SynthesizeSpeechAndUploadToS3)。SynthesizeSpeechAndUploadToS3関数は、google text-to-speechを用いてテキストから音声ファイルを作成し、AWS S3にアップロードする。

https://pkg.go.dev/cloud.google.com/go/texttospeech

  • 天気情報は、今回は一旦直書きで。後ほど別回収と合わせて回収する。
Go
package clocky_be

import (
	"log"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/jmoiron/sqlx"
)

// Message はキューに追加されるメッセージを表します。
type Message struct {
	UserID string // ユーザーID
	Type   string // メッセージのタイプ
}

// ScheduleTask はスケジュールされたタスクを実行し、メッセージをキューに追加します。
func ScheduleTask(db *sqlx.DB, messagesChan chan Message) {
	log.Println("ScheduleTask started")
	// 現在の時刻を取得
	currentTime := time.Now()
	jst, err := time.LoadLocation("Asia/Tokyo")
	if err != nil {
		log.Fatalf("Failed to load the 'Asia/Tokyo' time zone: %v", err)
	}
	currentTimeJST := currentTime.In(jst)
	log.Printf("Current time in JST: %v", currentTimeJST)

	// 現在時刻から1時間後の時刻を計算
	oneHourLater := currentTimeJST.Add(time.Hour)
	log.Printf("One hour later in JST: %v", oneHourLater)

	// SQLクエリを実行して、条件に合致するユーザーを取得
	var users []User
	query := `SELECT * FROM users WHERE DATE_FORMAT(weather_announcement_time, '%H:%i') = DATE_FORMAT(?, '%H:%i');`
	err = db.Select(&users, query, oneHourLater.String())

	if err != nil {
		log.Printf("Error fetching users: %v", err)
		return
	}
	// 合致したユーザーがいるかどうかを確認
	if len(users) > 0 {
		// 取得したユーザーからメッセージを生成し、チャネルに送信
		for _, user := range users {
			log.Printf("Scheduling task for user: %s", user.UID)
			message := Message{
				UserID: user.UID,
				Type:   "weather",
			}
			messagesChan <- message
		}
		log.Println("ScheduleTask completed")
	} else {
		// 合致したユーザーがいない場合のログ
		log.Println("ScheduleTask skipped: No matching users found")
	}
}

// processMessages はキューからメッセージを取り出し、処理します。
func ProcessMessages(messagesChan chan Message) {
	for message := range messagesChan {
		log.Printf("Processing message for user: %s, type: %s", message.UserID, message.Type)
		if message.Type == "weather" {
			// 天気情報のテキストを取得する。一旦直書きで
			// 天気情報取得のAPIリクエストを廃止する回収の時に、合わせて回収する。
			weatherText := "東京の本日の天気は晴れ時々曇り、最高気温は25度です。"

			// AWSセッションの作成
			config := NewConfig()
			awsSession, err := session.NewSession(&aws.Config{
				Region:      aws.String(config.AWSRegion),
				Credentials: credentials.NewStaticCredentials(config.AWSAccessKey, config.AWSSecretKey, ""),
			})
			if err != nil {
				log.Fatalf("Failed to create AWS session: %v", err)
			}

			// 天気情報を音声ファイルに変換して、AWS S3にアップロードする
			if err := SynthesizeSpeechAndUploadToS3(awsSession, config.AWSS3Bucket, weatherText, message.UserID, message.Type); err != nil {
				log.Printf("Failed to synthesize and upload speech for user %s: %v", message.UserID, err)
				continue
			}
		}
	}
}

クーロンジョブ(定期的なタスクスケジューリング)の設定:main.go

  • クーロンジョブはユーザーごとに設定された時刻の1時間前に、そのユーザーのタスクをキューに追加するように設定。
  • github.com/robfig/cron/v3 パッケージを利用して、スケジューリングを行う。
Zsh
$ go get github.com/robfig/cron/v3@v3.0.0

使い方

  • 引数は最大5つまで
  • 最初の引数が「秒(任意)」で、その後で、分、時間、日、月、曜日と続く。
  • cron.Descriptor は特別な記述子(例:@every 1h30m)をサポートすることを意味する
  • 秒単位のサポートを有効にするには、cron.New(cron.WithSeconds()) を使用してCronスケジューラを初期化する必要がある。
Go
// Seconds field, required
cron.New(cron.WithSeconds())

// Seconds field, optional
cron.New(cron.WithParser(cron.NewParser(
	cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)))
Go
package main

import (
	"log"

	clocky "github.com/EarEEG-dev/clocky_be"
	_ "github.com/go-sql-driver/mysql"
	"github.com/jmoiron/sqlx"
	"github.com/robfig/cron/v3"
)

func main() {
	config := clocky.NewConfig()

	db, err := sqlx.Open("mysql", config.DSN)
	if err != nil {
		log.Fatalf("unable to connect to database: %v", err)
	}

	if err := db.Ping(); err != nil {
		log.Fatalf("unable to ping database: %v", err)
	}
	log.Printf("connected to database")

	// メッセージチャネルとCronジョブの設定
	messagesChan := make(chan clocky.Message)

	// 秒単位をサポートしてcronを初期化
	c := cron.New(cron.WithSeconds())

	// 毎分0秒時点で起動
	c.AddFunc("0 * * * * *", func() {
		log.Println("Cron job triggered")
		clocky.ScheduleTask(db, messagesChan)
	})
	go c.Start()

	// 処理関数の起動数を増やして並列に処理させることも可能
	go clocky.ProcessMessages(messagesChan)

	// gRPCサーバー
	grpcServer := clocky.NewGRPCServer(config, db)
	go grpcServer.Start(":50051")

	// Rest API
	server := clocky.NewServer(config, db)
	server.Start()

}

検証

アプリ側で下記のように天気お知らせ時刻を設定。

すると、天気お知らせ時刻の1時間前になると(今回だと17:53)、cron jobでトリガーされたタスクが起動し、実際にAWS S3に音声ファイルが保存(bucket name→user id→weatherディレクトリ)された。

また、他の時刻の場合には、毎分Cron jobはトリガーされるが、天気お知らせ時刻の1時間前ではないので、実際のタスクはスキップされ(つまり重複で音声ファイルを生成しない)、そのログも保存された。

コメント

タイトルとURLをコピーしました