Oracle Data Transforms - Python SDK - データ・フローの作成およびスケジュール (2025/08/31)
Oracle Data Transforms - Python SDK - データ・フローの作成およびスケジュール (2025/08/31)
投稿者:Jerome Francoisse | Consulting Solution Architect
Oracle Data Transforms は、強力な Python SDK を搭載し、データパイプラインの構築、移行、保守、スケジュール設定など、データワークフローの自動化と一貫性を容易に実現します。以前、Oracle Data Transforms への最初の接続スクリプトを含む導入およびインストールガイドを公開しました。このブログを読む前に、必ずそちらのブログもご覧ください 。
それでは、新しいデータ エンティティまたはインポートされたデータ エンティティの追加、そして最終的な実行のスケジュール設定など、データ フローの作成手順を見ていきましょう。
データフローの作成
モジュールのインポートとOracle Data Transformsインスタンスへの接続
まず、必要なモジュールをインポートして接続を作成しましょう。認証のために、ワークベンチのコンストラクタにパスワードを安全に渡します。理想的には、パスワードはハードコードではなく、Vault またはシークレットマネージャーから取得する必要があります。
from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig from datatransforms.dataflow import DataFlow from datatransforms.dataentity import DataEntity from datatransforms.dataflow_load_options import OracleInsert,DataFlowIntegrationType from datatransforms.dataflow import DataFlow,Project,AggregateAttribute pswd="<your_data_transforms_password>" # ideally retrieved from a vault connect_params = WorkbenchConfig.get_workbench_config(pswd) workbench = DataTransformsWorkbench() workbench.connect_workbench(connect_params) workbench.get_all_connections()
最後の行はすべての接続を取得するので、これらの接続の新しいデータ エンティティをインポートまたは作成できるようになります。
データエンティティの作成とインポート
このブログでは、 https ://objectstorage.us-ashburn-1.oraclecloud.com/n/c4u04/b/moviestream_landing/o で入手可能なmoviestreamデータセットを使用します 。目標は、ロイヤルカスタマーチーム向けに月次レポートを作成し、最もロイヤルティの高い顧客セグメントと、成長の可能性のあるセグメントを特定することです。具体的には、各セグメントの長期購読者数を算出します。CUSTOMERデータエンティティは既に存在しますが、ソースとしてCUSTOMER_SEGMENTをインポートし、出力ターゲットとしてLONG_TERM_SUBSCRIBER_SEGMENTを作成する必要があります。
既存のデータベース テーブルから新しいデータ エンティティを作成するには、Oracle Data Transforms で定義されている接続名、データベース スキーマ、およびテーブルの名前を指定する必要があります。
# Create a new Data Entity from an existing database table
cust_seg = DataEntity().from_connection("adb23a1","JEROMEFR").entity_name("CUSTOMER_SEGMENT")
workbench.save_data_entity(cust_seg)
手動で列を追加することで、データエンティティを最初から作成することもできます。データベーステーブルは、後でデータフローを最初に実行したときに作成されます。
# CREATE the target Data Entity target_table = DataEntity().from_connection("adb23a1","JEROMEFR").entity_name("LONG_TERM_SUBSCRIBER_SEGMENT") target_table.add_column(name="SEGMENT_ID",position=1,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=-0) target_table.add_column(name="SEGMENT_NAME",position=2,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=500,scale=0) target_table.add_column(name="SEGMENT_SHORTNAME",position=3,dataType="VARCHAR2",dataTypeCode="VARCHAR2",length=200,scale=0) target_table.add_column(name="LONG_TERM_SUBSCRIBERS",position=4,dataType="NUMBER",dataTypeCode="NUMBER",length=38,scale=-0) workbench.save_data_entity(target_table)
データフローの作成とデータエンティティの追加
Moviestreamというプロジェクトに新しいデータフローを作成します。オプションでフォルダを追加することもできます。今回の場合、プロジェクトとフォルダはまだOracle Data Transformsに存在しませんが、以下のコードで作成できます。
df = DataFlow("DF_LONG_TERM_SUBSCRIBER_SEGMENT","MOVIESTREAM", "MONTHLY_REPORT")
次に、次の構文を使用して、ソース データ エンティティとターゲット データ エンティティの両方をデータ フローに追加します。
source_table=df.use(connection_name="adb23a1",schema_name="JEROMEFR",data_entity_name="CUSTOMER")
source_table=df.use(connection_name="adb23a1",schema_name="JEROMEFR",data_entity_name="CUSTOMER_SEGMENT")
target_table=df.use(connection_name="adb23a1",schema_name="JEROMEFR",data_entity_name="LONG_TERM_SUBSCRIBER_SEGMENT")
集計変換のプロパティの準備
各セグメントの顧客数をカウントするため、データフローにAggregate Transformを追加する必要があります。まず、Aggregateコンポーネントの属性を定義するためのリストを作成します。各属性について、データ型とGROUP BY句に含まれるかどうかを指定します。
agg_attr=[] agg_attr.append(AggregateAttribute("SEGMENT_ID","NUMBER",20,"CUSTOMER_SEGMENT.SEGMENT_ID", is_group_by="YES")) agg_attr.append(AggregateAttribute("SEGMENT_NAME","VARCHAR2",500,"CUSTOMER_SEGMENT.NAME", is_group_by="YES")) agg_attr.append(AggregateAttribute("SEGMENT_SHORTNAME","VARCHAR2",200,"CUSTOMER_SEGMENT.SHORT_NAME", is_group_by="YES")) agg_attr.append(AggregateAttribute("LONG_TERM_SUBSCRIBERS","NUMBER",38,"COUNT(CUSTOMER.CUST_ID)", is_group_by="NO"))
ここで、これらの各属性をソース テーブルの列または集計式 (この場合は COUNT(CUSTOMER.CUST_ID)) にマップするためのディクショナリを作成する必要があります。
agg_map={} agg_map["COUNT_SUBS.SEGMENT_ID*"]="CUSTOMER_SEGMENT.SEGMENT_ID" agg_map["COUNT_SUBS.SEGMENT_NAME*"]="CUSTOMER_SEGMENT.NAME" agg_map["COUNT_SUBS.SEGMENT_SHORTNAME*"]="CUSTOMER_SEGMENT.SHORT_NAME" agg_map["COUNT_SUBS.LONG_TERM_SUBSCRIBERS"]="COUNT(CUSTOMER.CUST_ID)"
積載戦略の選択
すべてのコンポーネントをデータフローに追加する前に、読み込み戦略を選択する必要があります。追加、マージ、削除して再作成、欠落している場合は作成、ターゲットの切り捨てのいずれかを選択できます。ここでは、必要に応じて作成し、各ロードの前に切り捨てを行います。
#Define the load table properties. The columns are auto mapped OracleInsert.CREATE_TARGET_TABLE = True OracleInsert.TRUNCATE_TARGET_TABLE = True load_options=DataFlowIntegrationType.append() load_options.update(OracleInsert.options())
変換を追加してデータフローを保存する
これで、データエンティティとトランスフォームをデータフローに追加できます。まず、既存のデータエンティティ(CUSTOMER)と新しくインポートしたデータエンティティ(CUSTOMER_SEGMENT)の2つをソースデータエンティティとして追加します。次に、両方のテーブルのSEGMENT_IDを使用してこれらを結合し、3年以上サブスクリプション契約している顧客をフィルタリングします。COUNT_SUBSという集計関数を追加し、属性リストとマッピング辞書を渡します。最後に、ターゲットテーブルを定義し、先ほど定義した読み込みオプションを渡します。
#Define the flow of transforms in the data flow df.from_source("CUSTOMER","adb23a1.JEROMEFR.CUSTOMER").\ from_source("CUSTOMER_SEGMENT","adb23a1.JEROMEFR.CUSTOMER_SEGMENT").\ join("Join","INNER","CUSTOMER.SEGMENT_ID=CUSTOMER_SEGMENT.SEGMENT_ID").\ filter_by("Filter","CUSTOMER.YRS_CUSTOMER >= 3").\ aggregate("COUNT_SUBS", "", "",agg_attr,agg_map).\ load("LONG_TERM_SUBSCRIBER_SEGMENT","adb23a1.JEROMEFR.LONG_TERM_SUBSCRIBER_SEGMENT",load_options)
最後に、create() コマンドを実行して、データフローを Oracle Data Transforms リポジトリに保存します。
df.create()
このコードを実行した後、Oracle Data Transforms Web UI で結果を確認し、2 つの新しいデータ エンティティ CUSTOMER_SEGMENT と LONG_TERM_SUBSCRIBER_SEGMENT が存在することを確認できます。
プロジェクトページに移動して、データフローを含む新しいMOVIESTREAMプロジェクトを確認しましょう。開いて内容を確認できます。
データ フローが構成され保存されたので、次のステップではそれを実行し、ターゲット テーブルにデータを入力します。
データフローのスケジュール
データ フローのスケジュールを作成するには、新しい Python スクリプトを作成し、必要なモジュールをインポートしてセッションを開始します。
from datatransforms.schedule import Schedule,SCHEDULE_STATUS_ACTIVE, MonthlyDayOfMonthSchedule from datatransforms.workbench import DataTransformsWorkbench,WorkbenchConfig pswd="<your_data_transforms_password>" # ideally retrieved from a vault connect_params = WorkbenchConfig.get_workbench_config(pswd) workbench = DataTransformsWorkbench() workbench.connect_workbench(connect_params)
ここで必要なのは、スケジュール オブジェクトをインスタンス化し、データ フローまたはワークフローを参照し、タイミングと頻度を設定し、アクティブ化してワークベンチに保存することだけです。
すぐに実行できるものを作成して、ロイヤルカスタマーチームに最初のレポートを提供できるようにしましょう。
schedule = Schedule("DF_LONG_TERM_SUBSCRIBER_SEGMENT Immediate")\ .dataflow(project="MOVIESTREAM",dataflow_name="DF_LONG_TERM_SUBSCRIBER_SEGMENT")\ .immediate()\ .schedule_status(SCHEDULE_STATUS_ACTIVE) workbench.save_schedule(schedule)
また、毎月 1 日の午前 6 時にデータ フローを実行する 2 番目のスケジュールも追加します。
schedule = Schedule("DF_LONG_TERM_SUBSCRIBER_SEGMENT Monthly Second to Last Day")\ .dataflow(project="MOVIESTREAM",dataflow_name="DF_LONG_TERM_SUBSCRIBER_SEGMENT")\ .monthly("01", "06:00:00")\ .schedule_status(SCHEDULE_STATUS_ACTIVE) workbench.save_schedule(schedule)
このスクリプトを実行すると、Oracle Data TransformsのWeb UIで結果を確認できます。2つの新しいスケジュールが表示され、そのうち1つはジョブページで既に実行されています。
SQL Developerを開いてAutonomous Databaseのデータを確認することもできます。新しいLONG_TERM_SUBSCRIBER_SEGMENTテーブルが作成され、Loyal Customerチームから要求されたデータが含まれていることがわかります。
まとめ
Oracle Data Transforms Python APIは、新しいデータエンティティとデータフローをプログラムで簡単に作成する方法を提供します。また、即時実行または後で実行するようにスケジュール設定することも可能です。
今後の記事では、Oracle Data Transforms Python APIのその他のユースケースについてもご紹介していきます。どうぞお楽しみに!
コメント
コメントを投稿