BigQuery Data Transfer Service のイベント ドリブン転送
エヌデーデーの関口です。あるお客様からCloud Storageに置いたCSVファイルを自動的にBigQueryに読み込みたいというご要望がありました。
これまでであれば Cloud Run functions の Cloud Storage イベントをトリガーに、ファイルをBigQueryにロードさせればよいと思っていたのですが、リリースノートを眺めていたところ BigQuery Data Transfer Service のイベント ドリブン転送という機能がGAになったとありましたので、試してみました。
BigQuery Data Transfer Service
BigQuery Data Transfer Service(以下DTS)は、あらかじめ設定されたスケジュールに基づいてさまざまなデータソースからBigQueryに自動的にデータを読み込ませることができるサービスです。
対象のデータソースは、Google Cloud Storage はもちろん、Amazon S3、Azure Blob Storage の他、プレビューではありますが Salesforce などからも取込が可能となっています。
イベント ドリブン転送
https://cloud.google.com/bigquery/docs/event-driven-transfer
文字通りイベントを検知して、BigQueryにデータを取り込む機能になっています。ただし、次のような制限事項もあります。
- イベントドリブン転送がトリガーされた後、BigQuery Data Transfer Service は、その時間内にイベントが到着したかどうかに関係なく、最大 10 分間待機してから次の転送実行をトリガーします。
- イベント ドリブン転送は、ソース URI またはデータパスのランタイム パラメータをサポートしていません。
- 同じ Pub/Sub サブスクリプションを複数のイベントドリブン転送構成で再利用することはできません。
これらの制限事項が許容できれば利用の検討をしてみてください。
事前準備
BigQuery DTS の有効化
Cloud Console上の検索ボックスにBigQuery Data Transfer API
と入力、検索してAPI設定画面に遷移し、有効化します。

あるいは Cloud Shell で次のコマンドを入力します。
gcloud services enable bigquerydatatransfer.googleapis.com
サービス エージェントの確認
APIを有効化した後、Cloud Consoleから「BigQuery」>「データ転送」と遷移するとDTSの画面が表示されています。

ここで一度、IAM メニューに遷移してサービス エージェントの状態を確認してみます。
Cloud Consoleから「IAMと管理」>「IAM」と遷移し、「Google 提供のロール付与を含める」のチェックを入れます。

するとservice-xxxxxxxx@gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com
というプリンシパルが表示されます。(x部分はプロジェクト番号)

サービス エージェントはGoogle Cloud内のサービスにおいて、ユーザーの代わって他のリソースにアクセスするために用意されるものです。後ほどこのサービス エージェントに対してロールを付与する必要があるので覚えておいてください。
Pub/Sub の有効化
イベント ドリブン転送は Pub/Sub 通知を使用しています。そのため Pub/Sub のAPIを利用可能にしておきます。
Cloud Console上の検索ボックスにCloud Pub/Sub API
と入力、検索してAPI設定画面に遷移し、有効化します。

あるいは Cloud Shell で次のコマンドを入力します。
gcloud services enable pubsub.googleapis.com
Cloud Storage から Pub/Sub への通知を構成
この操作は Cloud Console からは行えないため Cloud Shell で次のコマンドを入力します。
下記の例では、指定したバケット内にexample/data-
という接頭文字のオブジェクトが作成された(上書きされた)ときに通知を発報するというトピックを作ります。
※ object-prefix
を指定しない場合は、バケット内のすべてのオブジェクトが対象となります。
BUCKET_NAME="通知を行うのバケット名"
TOPIC_NAME="作成するPub/Subのトピック名"
OBJECT_PREFIX="example/data-"
gcloud storage buckets notifications create gs://${BUCKET_NAME} \
--object-prefix=${OBJECT_PREFIX} \
--topic=${TOPIC_NAME} --event-types=OBJECT_FINALIZE \
すると次のような出力でトピックが作成されたことが分かります。
etag: '1'
event_types:
- OBJECT_FINALIZE
id: '1'
kind: storage#notification
object_name_prefix: example/data-
payload_format: JSON_API_V1
selfLink: https://www.googleapis.com/storage/v1/b/BUCKET_NAME/notificationConfigs/1
topic: //pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_NAME
サブスクリプションを作成
先ほどのトピック(通知)に対する pull サブスクリプション(購読)を作ります。こちらも Cloud Shell で作りましょう。
SUBSCRIPTION="サブスクリプション名"
gcloud pubsub subscriptions create ${SUBSCRIPTION} --topic=${TOPIC_NAME}
サービス エージェント にロール付与
先ほど確認したサービス エージェント serivce-xxxxxxxx@gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com
がユーザーに代わって、Pub/Sub から通知を受け取れるようにします。
Cloud Consoleから「IAMと管理」>「IAM」と遷移し、「Google 提供のロール付与を含める」のチェックを入れます。
目的のサービス エージェントが表示されている行の右の編集マークから、ロール割り当て画面に遷移します。Pub/Sub サブスクライバー
ロールを追加します。
※(注意) Pub/Sub Lite サブスクライバーというロールではありません。

BigQuery DTSの構成
ここから BigQuery DTS の構成に入ります。
なお BigQuery のデータソースには、転送先となるテーブルが作成済みである事が前提となります。
今回は次のようなテーブルcustomer_table
を用意しておきました。
CREATE TABLE dataset.customer_table (
customer_id STRING,
customer_name STRING
);
Cloud Console から「BigQuery」>「データ転送」と進み「転送を作成」をクリックします。
書く設定値は次のように入力してください。
- ソースのタイプ: Google Cloud Storage
- スケジュール オプション: イベント ドリブン
- Pub/Sub サブスクリプション: 先ほど作成したサブスクリプションをドリップダウンから選択

続いて転送先の設定も入力します。
- データセット: 転送先のデータセット名
- Destination table: 転送先テーブル(今回の場合は
customer_table
) - Cloud Storage URI: 取込元になる CSV が置いてある Cloud Storage の場所を示します。今回はプリフィックスで
example/data-
から始まる CSV を取り込むので、バケット名/exmaple/data-*.csv
という具合にワイルドカードで指定をします - Write preference: APPEND(追加) または MIRROR(上書き)から選択します
- File format: 今回のケースは CSV とします

この他、取込ファイルの種類ごとの設定などを入力します。
最後に通知オプションを次の二つから設定できます。
- メール通知: この DTS を作成したユーザーのメールにのみ、実行と失敗の通知が届きます
- Pub/Sub 通知: 特定のトピックに対してメッセージを通知できます。通知を受け取ってさらに別の宛先に通知するなどの追加実装が可能となります。
最後に「保存」して完了です。

動作確認
次のような CSV を3つ用意しました。
data-001.csv
"CUSTOMER_ID","NAME"
"C1001","山田商会"
"C1002","田中商事"
"C1003","伊藤通商"
data-002.csv
"CUSTOMER_ID","NAME"
"C2004","鈴木物産"
"C2005","斉藤貿易"
"C2006","渡辺産業"
data-003.csv
"CUSTOMER_ID","NAME"
"C3007","山本興業"
"C3008","中村流通"
"C3009","吉田交易"
"C3010","髙橋物流"
ファイルを1つだけアップしてみる
まずは、data-001.csv だけを Cloud Storage にアップロードします。

しばらくすると、作成したデータ転送の詳細画面で次のように表示されます。
ファイルのアップロードから取込時間の差はおおよそ5分というところですね。


ファイルを2つ同時にアップしてみる
次に data-002.csv と data-003.csv を同時にアップロードしてみます。




読み込み対象外のファイルをアップしてみる
データは先の CSV に類似しているが、ファイル名が data-004.txt というファイルをアップロードしてみます。
このファイルは Cloud Storage の Pub/Sub 通知の条件(–object-prefix=example/data-)には当てはまっていますが、DTS の読込対象の条件(GCSのURI gs://BUCKET_NAME/example/data-*.csv)とは合致していません。

DTSの処理状況を見ると、こちらも5分経過後に処理が完了してる様子でした。

処理は完了しているのですが、ログを見るとファイルは対象外となったようです。

BigQuery にもデータは取り込まれていませんでした。
通知はどうなるのか?
DTS の通知設定では、メール通知を ON にしていました。
しかし、成功した通知はメールが届きませんでした。
敢えてエラーになるファイルをアップロードしてみたところ、次のようなメールを受信しました。
ただし、エラーの詳細な理由までは記されていませんでした。

先に書いたようにメールの通知は、あくまで当該 DTS を作成したユーザーにのみ通知される仕組みです。
もう一つの方法として Pub/Sub での通知を受けて他の処理にまかせるということもできます。
しかし、実際の運用では Logging のログベースのアラート ポリシーを構成するという方法が一番簡単ではないかと思います。
まとめ
BigQuery Data Transfer Service のイベント ドリブンは Cloud Storage からニアリアルタイムで、比較的簡単にデータ取込の仕組みを行うのに便利です。
なお、より複雑な要件であれば、Cloud Run functions など他のサービスを利用することを検討してください。