はじめに
AWS Community Builderのぺんぎん(@jitepengin)です。
AWS上でデータパイプラインや業務システムを構築していると、アプリケーション処理の一部としてSnowflakeに直接アクセスしたいケースが発生する場合があります。
例えば、下記のようなユースケースです。
- API経由で受け取ったデータをSnowflakeにロードする
- ETL / ELT 処理の完了後にSnowflake側でSQLを実行する
- 外部システム連携の結果をSnowflakeに書き込む
- タスク実行のトリガーとしてSnowflakeのストアドプロシージャを呼び出す
AWS Lambdaはサーバーレスでイベント駆動処理と相性が良く、Snowflakeと組み合わせることで柔軟なデータ連携基盤を構築できます。
一方で、LambdaからSnowflakeに接続する際には、認証情報をどのように安全に管理するかが非常に重要です。
2026年現在、Snowflake のシステム連携ではKey Pair認証またはOAuthが標準となっており、パスワードを直接埋め込む方式は推奨されません。
今回は、AWS Secrets Managerを利用して秘密鍵を安全に管理し、LambdaからSnowflakeへKey Pair認証で接続する実装パターンを解説したいと思います。
参考:単一要素パスワードサインインの廃止計画
https://docs.snowflake.com/ja/user-guide/security-mfa-rollout
そもそも、なぜLambdaからSnowflakeにアクセスするのか
まず、そもそも、なぜLambdaからSnowflakeにアクセスする必要があるのかを考えてみたいと思います。
ありがちなユースケース3パターンで説明します。
ケース1:イベント駆動のデータ投入
最も多いのはこのパターンかなと思います。
S3 / API Gateway / EventBridge
↓
Lambda
↓
Snowflake
例えば、
- APIで受け取った注文データ
- S3にアップロードされたCSV/JSON
- SaaS連携イベント
をLambdaで受け取り、そのままSnowflakeにロードします。
Snowpipeを使う構成もありますが、
- 前処理が必要
- フォーマット変換が必要
- 複数システム連携がある
場合はLambdaの方が扱いやすい場面が多いと思います。
ケース2:AWS ETL完了後のSnowflake連携
このパターンはAWS側でETLを実行した後に、Lambda経由でSnowflakeのSQLを実行するパターンです。
データ基盤としてIcebergを利用している場合を例としています。
Glue / Lambda
↓
S3 Iceberg
↓
Lambda
↓
Snowflake SQL実行
例えば、
- AWS側でETL完了
- Snowflake側でREFRESH
- MERGE SQL実行
- データ品質チェック
- BI用マート更新
といった後続処理を行う場面です。
ケース3:オペレーション自動化
このパターンも実務ではあると思います。
例えば、
- warehouseの起動/停止
- task 実行
- SQL自動実行
といった動作をLambdaから自動化します。
アーキテクチャ
今回の構成は以下です。
Event Source
↓
AWS Lambda
↓
Secrets Manager
↓
Private Key取得
↓
Snowflake
LambdaはSecrets Managerから秘密鍵を取得し、Key Pair認証でSnowflakeに接続します。
セキュリティ上のベストプラクティス
1.環境変数に秘密鍵を置かない
今回のSnowflakeに限らず重要なものとなります。
やりがちなアンチパターンとしては、下記のように環境変数に設定するものです。
PRIVATE_KEY = os.environ["PRIVATE_KEY"]
これは絶対に避けるべきで、セキュリティの懸念の他、下記のような懸念があります。
- 誤ってログ出力される可能性
- 設定変更時の運用負荷
- ローテーションしにくい
- 権限制御が弱い
2.Secrets Managerで一元管理
こういった機密情報の取り扱いはSecrets Managerで一元管理するのが良いと思います。
Secrets Managerを使うことで下記のようなメリットがあります。
- IAMベースのアクセス制御が可能
- KMS暗号化が可能
- 監査ログ(CloudTrail)の管理が可能
- ローテーション運用が可能
3.IAM最小権限
Lambdaには必要最小限の権限のみ付与します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": "arn:aws:secretsmanager:ap-northeast-1:123456789012:secret:snowflake-keypair-*"
}
]
}
4.Snowflakeユーザーはサービス専用のユーザーを使用する。
個人ユーザーを使わず、サービス専用ユーザーを作成します。
サービス用のユーザはTYPE = 'LEGACY_SERVICE' または TYPE = 'SERVICE' で作成してください。
認証情報の登録
1.秘密鍵の作成
まずは、秘密鍵を作成します。
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
2.Snowflake側設定
公開鍵をユーザーに登録します。
ALTER USER lambda_service_user
SET RSA_PUBLIC_KEY='MIIBIjANBgkq...';
公開鍵はrsa_key.pubファイルの内容を使用します。
3.Secrets Manager登録
秘密鍵を以下のように保存します。
{
"account": "XXXXXXXXXX",
"user": "lambda_service_user",
"privateKey": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----",
"passphrase": "",
"role": "LAMBDA_ROLE",
"warehouse": "COMPUTE_WH",
"database": "ICEBERGDB",
"schema": "PUBLIC"
}
※accountにはSnowflakeのaccount identifier、userにはSnowflakeの接続ユーザー、privateKeyには1.秘密鍵の作成で作成した秘密鍵(rsa_key.p8)を設定してください。
account identifierは<orgname>-<account_name>形式が推奨となります。
これで準備が整いました。
Lambdaサンプルコード
今回は確認のために SELECT CURRENT_VERSION() を実行するだけの簡単な処理とします。
ポイントはsnowflake.connectorを使用する部分となります。
こちらを活用することで簡単にSnowflakeとの接続が可能となります。
snowflake.connectorはLambda Layerで追加するか、Lambda自体をコンテナ形式で作成してください。
import json
import boto3
import snowflake.connector
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import time
import random
SECRET_ID = "snowflake-keypair"
def get_secret():
client = boto3.client("secretsmanager")
response = client.get_secret_value(SecretId=SECRET_ID)
return json.loads(response["SecretString"])
def execute_query_with_retry(conn, query, max_retries=3):
retry_count = 0
while retry_count < max_retries:
try:
cur = conn.cursor()
cur.execute(query)
result = cur.fetchone()
cur.close()
return result
except snowflake.connector.Error as e:
print(f"Query failed (attempt {retry_count+1}/{max_retries}): {e}")
retry_count += 1
sleep_time = min(2 ** retry_count, 30) + random.random()
time.sleep(sleep_time)
raise Exception("Query execution exceeded max retries.")
def lambda_handler(event, context):
secret = get_secret()
private_key_obj = serialization.load_pem_private_key(
secret["privateKey"].encode("utf-8"),
password=None,
backend=default_backend()
)
conn = None
try:
conn = snowflake.connector.connect(
user=secret["user"],
account=secret["account"],
private_key=private_key_obj,
role=secret.get("role"),
warehouse=secret.get("warehouse"),
database=secret.get("database"),
schema=secret.get("schema")
)
result = execute_query_with_retry(conn, "SELECT CURRENT_VERSION()")
return {
"statusCode": 200,
"body": str(result)
}
except Exception as e:
print(f"Lambda execution error: {e}")
return {
"statusCode": 500,
"body": str(e)
}
finally:
if conn:
conn.close()
実行結果
{
"statusCode": 200,
"body": "('10.11.2',)"
}
Snowflake側で実行
SELECT CURRENT_VERSION()
>10.11.2
結果が一致したので、うまく接続できました!
その他留意事項
ネットワークセキュリティの強化
Snowflakeに接続する際に、AWS PrivateLink経由でプライベートネットワーク内で通信する構成にすると、インターネット経由の通信リスクを排除できます。
下記の手順で対応可能です。
- Snowflakeでプライベート接続を有効化
- VPCエンドポイントを作成(サービスプリンシパル: com.amazonaws.vpce.snowflake)
- AWSでVPCエンドポイントを作成し、Snowflakeのプリンシパルとペアリング
- Lambdaを同じVPCに配置し、セキュリティグループでSnowflakeエンドポイントへのみアクセスを許可
エラーハンドリング
サンプルコードでも実装していますが、ネットワーク障害や一時的なSnowflakeの障害に耐えるため、接続失敗時のリトライ処理を実装するのが良いと思います。
リトライ処理では固定遅延時間が使用されていますが、指数バックオフを採用すると、ネットワーク障害への耐性が向上します。
監視
下記のような観点で監視すると良いと思います。
- CloudWatch Logs: 接続エラーやクエリ実行失敗を捕捉
- CloudTrail: Secrets Managerへのアクセスログを確認
- Snowflake監査ログ: ユーザーの接続状況を監視
まとめ
今回はAWS LambdaからSnowflakeにアクセスする際のKey Pair認証の鍵管理から接続までを紹介しました。
AWS LambdaからSnowflakeへアクセスするユースケースは意外と多くあり、イベント駆動処理やETL後続処理で特に有効です。
そして、Snowflakeに限らず、認証情報の安全管理は非常に重要となります。
現時点では、Snowflakeへの接続に際し、Key Pair認証+Secrets Managerが標準的な実装パターンになると思います。
今回の記事が、AWSとSnowflakeを接続するアプリケーション構築の参考になれば幸いです。
