[OCI]Oracle Functions と Oracle Streaming Serviceの統合 (2020/10/01)
Oracle Functions と Oracle Streaming Serviceの統合 (2020/10/01)
https://blogs.oracle.com/developers/integrating-oracle-functions-and-oracle-streaming-service
投稿者:Igor Aragao de Souza | Principal Big Data Consultant
こんにちは。
クラウド上でアプリケーション・コードを実行するためのソリューションとして、サーバーレス・コンピュートがますます好まれるようになってきています。
企業はリアルタイム性を重視するイベントドリブンの時代へと移行しています。
リアルタイム性とストリーミング・データについて言えば、ストリーミング・サービスは当然の選択です。
オラクルのサーバーレス・コンピュート・プラットフォーム、Oracle Functions、およびOracle Streaming Serviceの機能を組み合わせることで、
非常に簡単にデータを生成してストリームに消費できるという、両方の世界的な利点を実現しています。
サーバーレスでイベントドリブンへの旅
TL;DR
- クラウド上でアプリケーションコードを実行するためのソリューションとして、サーバーレスコンピュートがますます好まれるようになってきています。
- ここ数年、革命が起きており、組織はリアルタイムにならなければならず、リアルタイムになるためには、イベントドリブンにならなければなりません。
- Streaming Serviceは、リアルタイムとストリーミングデータの話をするときの自然な選択です。
- ストリーミングをチェックして、ロジックが有効なときにFunctionをトリガーするコードがある状況を想像してみてください。
- Streaming ServiceのプロデューサーまたはコンシューマーをトリガーするOracle Functionを作成できることをご存知でしょうか。
- Oracle FunctionはStreamingのプロデューサーやコンシューマーを呼び出すことができます。
- Oracle Streaming ServiceはOracle Functionをトリガーすることができます。
この記事では、Oracle Function(FN Project)を使用してOracle Streaming Service(Kafka)トピックから
メッセージを発行したり、消費したりするシナリオを構築し、Oracle Streaming ServiceがOracle Functionをトリガーする方法を紹介します。
前提条件
- Access to Oracle Cloud
- OCI Functions
- OCI Streaming
Oracle Functionsのアーキテクチャは、Streaming Serviceのプロデューサーと消費者を呼び出す
Streaming Serviceのアーキテクチャは、Oracle Functionsをトリガーにしています。
Function as a Service (FaaS)とは、クラウド上のどこかでFunctionを実行する機能のことです。
Functionには特注のロジック・ブロックが含まれており、API Gatewayのようなレジストリを介して呼び出されたり、
クラウド関連のイベント(Oracle Object Storeに書き込まれたデータなど)によってスケジュールされたり、トリガーされたりします。
Functionは一定期間実行された後に終了することが予想され、クラウド・ベンダーはミリ秒単位で課金します(および関連メモリも)。
Functionは多くのインスタンスを並行して実行することができ、最初の "コールド "スタートコールの後、それらは "ホット "とみなされます。
最初の呼び出しでは、データベース接続(など)を初期化する必要があります。
API Gatewayを介して呼び出す場合、Functionは値を返すために同期的に呼び出されることがあります。
FaaSには多くのメリットがあります。
- 非常にコスト効率が良い
- スケーラブル
- 軽量
- 分離された
で、もちろんStreaming Serviceのプロデューサーやコンシューマーを呼び出すFunctionを作成することも可能です。
Oracle Streaming Service(OSS)はKafkaに対応しているので、Kafka producerやConsumerを含むKafka APIを利用することができます。
3つのシナリオに分けることができます。
A. Oracle Functionは、Streaming Serviceのプロデューサーを呼び出す
B. Oracle Functionは、Streaming Serviceの消費者を呼び出す
C. Oracle Streaming Serviceは、Oracle Functionをトリガする
Oracle FunctionsとStreaming Serviceのプロデューサーとコンシューマーを統合するアイデアやユースケースをいくつかご紹介します。
- OCIインフラストラクチャからデータを読み書きする。(AまたはB)
- REST コール(AまたはB)からproducerまたはconsumerをスタートさせる
- OCIイベント(AまたはB)からproducerまたはconsumerのトリガー
- 他のクラウドプロバイダーにデータを送る(A)
- オンプレミスデータの読み込み(B)
- 株式市場の値が1%成長すると、私の株式(C)を売却するFunctionをトリガ
- センサーが新しい値を送信するのに5秒以上かかる場合は、何らかのアクションを実行するためのFunctionをトリガー(C)
- 温度値が特定の値(C)よりも大きい場合
- モータの始動回数が100回よりも大きい場合(C)
これは網羅的なリストではありません。
いいね、私はすべての利点を理解し、私は私のFunctionを作成する準備ができています。
コード化しよう
シナリオA
Oracle Functionは、ストリーミングプロデューサーのPythonの例を呼び出します。
import io from kafka import KafkaProducer from fdk import response def handler(ctx, data: io.BytesIO = None): try: body = json.loads(data.getvalue()) except (Exception, ValueError) as ex: logging.getLogger().info('error parsing json payload: ' + str(ex)) producer = KafkaProducer(bootstrap_servers = 'streaming.{region}.oci.oraclecloud.com:9092', security_protocol = 'SASL_SSL', sasl_mechanism = 'PLAIN', sasl_plain_username = '{tenancyName}/{username}/{stream_pool_OCID}', sasl_plain_password = '{authToken}') key = 'key1'.encode('utf-8') data = body producer.send('yourTopic', key=key, value=data) |
シナリオB
Oracle Functionは、コンシューマーPythonの例を呼び出す
import io from kafka import KafkaConsumer from fdk import response def handler(ctx, data: io.BytesIO = None): consumer = KafkaConsumer('yourTopic', bootstrap_servers = 'streaming.{region}.oci.oraclecloud.com:9092', security_protocol = 'SASL_SSL', sasl_mechanism = 'PLAIN', consumer_timeout_ms = 10000, auto_offset_reset = 'earliest', group_id='group-0', sasl_plain_username = '{tenancyName}/{username}/{stream_pool_OCID}', sasl_plain_password = '{authToken}') for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) |
シナリオA&B
Oracle Functionは、Java Oracle SDKの例を使用してStreaming Serviceを呼び出す
public StreamProducerFunction() { try { String privateKey = System.getenv().get("OCI_PRIVATE_KEY_FILE_NAME"); Supplier<InputStream> privateKeySupplier = () -> { InputStream is = null; String ociPrivateKeyPath = "/function/" + privateKey; System.err.println("Private key location - " + ociPrivateKeyPath); try { is = new FileInputStream(ociPrivateKeyPath); } catch (FileNotFoundException ex) { System.err.println("Problem accessing OCI private key at " + ociPrivateKeyPath + " - " + ex.getMessage()); } return is; }; authProvider = SimpleAuthenticationDetailsProvider.builder() .tenantId(System.getenv().get("TENANCY")) .userId(System.getenv().get("USER")) .fingerprint(System.getenv().get("FINGERPRINT")) .passPhrase(System.getenv().get("PASSPHRASE")) .privateKeySupplier(privateKeySupplier) .build(); sAdminClient = new StreamAdminClient(authProvider); String region = System.getenv().get("REGION"); //e.g. us-phoneix-1 sAdminClient.setEndpoint("https://streams." + region + ".streaming.oci.oraclecloud.com"); } catch (Throwable ex) { System.err.println("Error occurred in StreamProducerFunction constructor - " + ex.getMessage()); } } |
コード全体はこちら
シナリオA&B
Oracle Functionは、Java Micronautの例を使用してStreaming Serviceを呼び出す
Announcing the Micronaut Oracle Cloud Module for Simple SDK Integration and Performant Serverless Functions
API Gatewayを使用して、FunctionをトリガするためのRESTインターフェースを公開することができます。
シナリオC
この部分はKafka Connectを使用しています。
Oracle Streaming ServiceのトリガーOracle Functions
@Override public void put(Collection<SinkRecord> records) { for (SinkRecord record : records) { //check business logic if(triggerFn(record)) { try { FnHTTPPost fnPOST = new FnHTTPPost(config.getTenantOcid(), config.getUserOcid(), config.getPublicFingerprint(), config.getPrivateKeyLocation()); String fnPOSTResult = fnPOST.invoke(config.getFunctionUrl(), (String) record.value()); } catch (Exception e) { e.printStackTrace(); } } } } .... /** * @param SinkRecord record * here is the business logic to decide if trigger or not a FN function. */ private boolean triggerFn(SinkRecord record){ boolean trigger = true; if(record.value() != null){ // do some validation here } return trigger; } |
メソッド "triggerFn" は、Functionをトリガーするためのビジネスロジックを追加する場所です。
コード全体はこちら
リンク
Getting Started With Oracle Streaming Service OSS
Oracle Streaming Service Producer & Consumer
Getting Started with API Gateway
Micronaut OCI Functions GitHub
Photo by Taylor Vick on Unsplash
コメント
コメントを投稿