[OCI]Oracle Functions と Oracle Streaming Serviceの統合 (2020/10/01)

 Oracle Functions と Oracle Streaming Serviceの統合 (2020/10/01)

https://blogs.oracle.com/developers/integrating-oracle-functions-and-oracle-streaming-service
投稿者:Igor Aragao de Souza | Principal Big Data Consultant

こんにちは。

クラウド上でアプリケーション・コードを実行するためのソリューションとして、サーバーレス・コンピュートがますます好まれるようになってきています。
企業はリアルタイム性を重視するイベントドリブンの時代へと移行しています。
リアルタイム性とストリーミング・データについて言えば、ストリーミング・サービスは当然の選択です。

オラクルのサーバーレス・コンピュート・プラットフォーム、Oracle Functions、およびOracle Streaming Serviceの機能を組み合わせることで、
非常に簡単にデータを生成してストリームに消費できるという、両方の世界的な利点を実現しています。

サーバーレスでイベントドリブンへの旅

TL;DR

  • クラウド上でアプリケーションコードを実行するためのソリューションとして、サーバーレスコンピュートがますます好まれるようになってきています。
  • ここ数年、革命が起きており、組織はリアルタイムにならなければならず、リアルタイムになるためには、イベントドリブンにならなければなりません。
  • Streaming Serviceは、リアルタイムとストリーミングデータの話をするときの自然な選択です。
  • ストリーミングをチェックして、ロジックが有効なときにFunctionをトリガーするコードがある状況を想像してみてください。
  • Streaming ServiceのプロデューサーまたはコンシューマーをトリガーするOracle Functionを作成できることをご存知でしょうか。
  • Oracle FunctionはStreamingのプロデューサーやコンシューマーを呼び出すことができます。
  • Oracle Streaming ServiceはOracle Functionをトリガーすることができます。

この記事では、Oracle Function(FN Project)を使用してOracle Streaming Service(Kafka)トピックから
メッセージを発行したり、消費したりするシナリオを構築し、Oracle Streaming ServiceがOracle Functionをトリガーする方法を紹介します。

前提条件

  • Access to Oracle Cloud
  • OCI Functions
  • OCI Streaming


Oracle Functionsのアーキテクチャは、Streaming Serviceのプロデューサーと消費者を呼び出す


Streaming Serviceのアーキテクチャは、Oracle Functionsをトリガーにしています。


Function as a Service (FaaS)とは、クラウド上のどこかでFunctionを実行する機能のことです。
Functionには特注のロジック・ブロックが含まれており、API Gatewayのようなレジストリを介して呼び出されたり、
クラウド関連のイベント(Oracle Object Storeに書き込まれたデータなど)によってスケジュールされたり、トリガーされたりします。

Functionは一定期間実行された後に終了することが予想され、クラウド・ベンダーはミリ秒単位で課金します(および関連メモリも)。
Functionは多くのインスタンスを並行して実行することができ、最初の "コールド "スタートコールの後、それらは "ホット "とみなされます。
最初の呼び出しでは、データベース接続(など)を初期化する必要があります。
API Gatewayを介して呼び出す場合、Functionは値を返すために同期的に呼び出されることがあります。

FaaSには多くのメリットがあります。

  • 非常にコスト効率が良い
  • スケーラブル
  • 軽量
  • 分離された

で、もちろんStreaming Serviceのプロデューサーやコンシューマーを呼び出すFunctionを作成することも可能です。

Oracle Streaming Service(OSS)はKafkaに対応しているので、Kafka producerやConsumerを含むKafka APIを利用することができます。

3つのシナリオに分けることができます。

A. Oracle Functionは、Streaming Serviceのプロデューサーを呼び出す
B. Oracle Functionは、Streaming Serviceの消費者を呼び出す
C. Oracle Streaming Serviceは、Oracle Functionをトリガする


Oracle FunctionsとStreaming Serviceのプロデューサーとコンシューマーを統合するアイデアやユースケースをいくつかご紹介します。

  • OCIインフラストラクチャからデータを読み書きする。(AまたはB)
  • REST コール(AまたはB)からproducerまたはconsumerをスタートさせる
  • OCIイベント(AまたはB)からproducerまたはconsumerのトリガー
  • 他のクラウドプロバイダーにデータを送る(A)
  • オンプレミスデータの読み込み(B)
  • 株式市場の値が1%成長すると、私の株式(C)を売却するFunctionをトリガ
  • センサーが新しい値を送信するのに5秒以上かかる場合は、何らかのアクションを実行するためのFunctionをトリガー(C)
  • 温度値が特定の値(C)よりも大きい場合
  • モータの始動回数が100回よりも大きい場合(C)

これは網羅的なリストではありません。



いいね、私はすべての利点を理解し、私は私のFunctionを作成する準備ができています。

コード化しよう


シナリオA

Oracle Functionは、ストリーミングプロデューサーのPythonの例を呼び出します。

import io
from kafka import KafkaProducer
from fdk import response


def handler(ctx, data: io.BytesIO = None):

try:
body = json.loads(data.getvalue())
except (Exception, ValueError) as ex:
logging.getLogger().info('error parsing json payload: ' + str(ex))

producer = KafkaProducer(bootstrap_servers = 'streaming.{region}.oci.oraclecloud.com:9092',
security_protocol = 'SASL_SSL', sasl_mechanism = 'PLAIN',
sasl_plain_username = '{tenancyName}/{username}/{stream_pool_OCID}',
sasl_plain_password = '{authToken}')
key = 'key1'.encode('utf-8')
data = body

producer.send('yourTopic', key=key, value=data)


シナリオB

Oracle Functionは、コンシューマーPythonの例を呼び出す

import io
from kafka import KafkaConsumer
from fdk import response


def handler(ctx, data: io.BytesIO = None):

consumer = KafkaConsumer('yourTopic', bootstrap_servers = 'streaming.{region}.oci.oraclecloud.com:9092',
security_protocol = 'SASL_SSL', sasl_mechanism = 'PLAIN',
consumer_timeout_ms = 10000, auto_offset_reset = 'earliest',
group_id='group-0',
sasl_plain_username = '{tenancyName}/{username}/{stream_pool_OCID}',
sasl_plain_password = '{authToken}')

for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))


シナリオA&B

Oracle Functionは、Java Oracle SDKの例を使用してStreaming Serviceを呼び出す

public StreamProducerFunction() {

try {
String privateKey = System.getenv().get("OCI_PRIVATE_KEY_FILE_NAME");
Supplier<InputStream> privateKeySupplier = () -> {
InputStream is = null;
String ociPrivateKeyPath = "/function/" + privateKey;
System.err.println("Private key location - " + ociPrivateKeyPath);

try {
is = new FileInputStream(ociPrivateKeyPath);
} catch (FileNotFoundException ex) {
System.err.println("Problem accessing OCI private key at " + ociPrivateKeyPath + " - " + ex.getMessage());
}

return is;

};

authProvider
= SimpleAuthenticationDetailsProvider.builder()
.tenantId(System.getenv().get("TENANCY"))
.userId(System.getenv().get("USER"))
.fingerprint(System.getenv().get("FINGERPRINT"))
.passPhrase(System.getenv().get("PASSPHRASE"))
.privateKeySupplier(privateKeySupplier)
.build();

sAdminClient = new StreamAdminClient(authProvider);
String region = System.getenv().get("REGION"); //e.g. us-phoneix-1
sAdminClient.setEndpoint("https://streams." + region + ".streaming.oci.oraclecloud.com");

} catch (Throwable ex) {
System.err.println("Error occurred in StreamProducerFunction constructor - " + ex.getMessage());
}
}


コード全体はこちら


シナリオA&B

Oracle Functionは、Java Micronautの例を使用してStreaming Serviceを呼び出す

Announcing the Micronaut Oracle Cloud Module for Simple SDK Integration and Performant Serverless Functions

Oracle Python SDK

Oracle Java SDK





API Gatewayを使用して、FunctionをトリガするためのRESTインターフェースを公開することができます。



シナリオC

この部分はKafka Connectを使用しています。

Oracle Streaming ServiceのトリガーOracle Functions

@Override
public void put(Collection<SinkRecord> records) {

for (SinkRecord record : records) {

//check business logic
if(triggerFn(record)) {
try {
FnHTTPPost fnPOST = new FnHTTPPost(config.getTenantOcid(), config.getUserOcid(),
config.getPublicFingerprint(), config.getPrivateKeyLocation());
String fnPOSTResult = fnPOST.invoke(config.getFunctionUrl(), (String) record.value());
} catch (Exception e) {
e.printStackTrace();
}
}

}
}

....

/**
* @param SinkRecord record
* here is the business logic to decide if trigger or not a FN function.
*/
private boolean triggerFn(SinkRecord record){

boolean trigger = true;

if(record.value() != null){
// do some validation here
}

return trigger;
}


メソッド "triggerFn" は、Functionをトリガーするためのビジネスロジックを追加する場所です。

コード全体はこちら


リンク

OCI Functions

Overview of OCI Functions

Micronaut OCI Functions

OCI Streaming

Getting Started With Oracle Streaming Service OSS

Oracle Streaming Service Producer & Consumer

OCI API Gateway

Getting Started with API Gateway

Oracle Functions GitHub

Micronaut OCI Functions GitHub

Photo by Taylor Vick on Unsplash

コメント

このブログの人気の投稿

Oracle Database 19cサポート・タイムラインの重要な更新 (2024/11/20)

Oracle APEXのInteractive Gridで、Oracle Formsと比較して、重複行の検証を制御/通過させる方法 (2022/07/21)

Oracle APEX 24.1の一般提供の発表 (2024/06/17)