Oracle Kubernetes Engine上のOracle StreamingサービスによるKafka Connect (2022/08/31)

Oracle Kubernetes Engine上のOracle StreamingサービスによるKafka Connect (2022/08/31)

https://blogs.oracle.com/cloud-infrastructure/post/kafka-connect-with-oracle-streaming-service-on-oracle-kubernetes-engine

投稿者:Nitin Soni | Principal Solution Architect


Kafka Connectは、外部システムからKafkaトピックへのデータの移動、またはその逆を可能にする、拡張可能なフレームワークです。Kafkaシンクまたはソースコネクターは、フレームワークが外部システムに接続するためのコンポーネントです。一般的なデータソースとシンクのために多くのソースとシンクコネクタを使用したり、カスタムユースケースのためにコネクタを実装することができます。


これらのコネクタを実行するには、KafkaブローカーとKafka Connectランタイムに依存する必要があります。これらの依存関係を管理する負担を軽減するために、KafkaブローカーについてはOracle Cloud Infrastructure(OCI)のStreamingサービスに依存することができます。OCI Streamingは、Kafka Connect構成を提供することで、Kafka Connectランタイムをサポートします。各Connect構成は、Kafka Connectランタイムが接続するための構成、ステータス、およびオフセットトピックを提供します。Kafka Connectランタイムは、Oracle Container Engine for Kubernetes(OKE)上でホストすることができます。


このブログ記事では、OKEクラスター上でOCI Streamingを使用してKafka Connectランタイムをセットアップする方法を中心に説明します。



依存関係




Kafka Connectの設定を作成


クラウドアカウントにログインし、「Analytics」セクションで「Streaming」を選択します。Kafka Connectの設定メニューで、設定を作成するオプションを選択します。



作成後、設定、ステータス、ストレージのトピック名を使用します。Kafka ConnectのDockerイメージの設定中に使用するので、保存してください。




OKEのセットアップ


すでにK8クラスターが稼働していて、それを利用してKafka Connectランタイムをセットアップする場合は、このセクションをスキップできます。


K8 クラスターを初めてセットアップする場合は、Oracle Cloud Console の「Kubernetes Clusters (OKE)」セクションで提供される Quick Create ワークフローを使用できます。詳細については、「コンソールを使用して『Quick Create』ワークフローでデフォルト設定のクラスターを作成」を参照してください。


また、Terraformスクリプトを使用して、クラスターをすばやく起動することができます。この方法のクイックスタートは、GitHubで公開されています。



OKEでKafka Connectランタイムを設定


まず、K8でネームスペースを作成します。

kubectl create ns kafka-connect


以下の内容でプロパティファイルconnect-distributed.envを作成します。

LOG_LEVEL=INFO

BOOTSTRAP_SERVERS=cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092
GROUP_ID=grp-1

CONFIG_STORAGE_TOPIC=<connect_configuration_ocid>-config
OFFSET_STORAGE_TOPIC=<connect_configuration_ocid>-offset
CONNECT_STATUS_STORAGE_TOPIC=<connect_configuration_ocid>-status

CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
CONNECT_SECURITY_PROTOCOL=SASL_SSL
CONNECT_SASL_MECHANISM=PLAIN
CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<user>/<stream_pool_ocid>" password="<auth_code>";

CONNECT_PRODUCER_BUFFER_MEMORY=10240
CONNECT_PRODUCER_BATCH_SIZE=2048
CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tanancy>/<user>/<stream_pool_ocid>" password="<auth_code>";

CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="<teanancy>/<user>/<stream_pool_ocid>" password="<auth_code>";


次のコマンドを実行すると、「kafka-connect」名前空間に「kafka-connect-config」という名前でシークレットが作成されます。これらの環境変数は、Kafka Connect ランタイムを実行するコンテナで利用可能です。

kubectl create secret generic kafka-connect-config --from-env-file=connect-distributed.env -n kafka-connect


次の内容でデプロイメントファイル(kafka-connect.yaml)を作成します。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-connect-deployment
spec:
  selector:
    matchLabels:
      app: kafka-connect
  replicas: 1 # tells deployment to run 1 pods matching the template
  template: # create pods using pod definition in this template
    metadata:
      labels:
        app: kafka-connect
    spec:
      containers:
      - image: debezium/connect:1.2.4.Final
        name: kafka-connect
        ports:
        - containerPort: 8083
        envFrom:
          - secretRef:
              name: kafka-connect-config

---

apiVersion: v1
kind: Service
metadata:
  name: kafka-connect-lb
  labels:
    app: kafka-connect-lb
  annotations:
    oci.oraclecloud.com/load-balancer-type: "lb"
spec:
  type: LoadBalancer
  ports:
  - port: 80
    protocol: TCP
    targetPort: 8083
  selector:
    app: kafka-connect


このデプロイの詳細については、以下の点に注意してください。


  •     Kafka ランタイムは debezium/connect:1.2.4.Final Docker イメージを使用します。
  •     このデプロイメントのコンテナは、Kafka Connect ランタイム用にポート 8083 を公開します。このポートは、REST API のクエリをリッスンします。API のリファレンスは、Kafka のドキュメントを参照してください。
  •     コンテナ内の環境変数は、secret "kafka-connect-config"を使用して設定します。
  •     ロードバランサーを作成し、展開された複数のポッド間のトラフィックを分散させます。ロードバランサーのポート80とPodのポート8083をバインドします。OKE のロードバランサーの詳細については、「Defining Kubernetes Services of Type LoadBalancer」を参照してください。


K8クラスターのkafka-connectネームスペースにデプロイメントを作成します。

kubectl apply -f kafka-connect.yml -n kafka-connect

コンテナのログを確認し、ポッドが正常に起動したかどうかを確認します。

kubectl logs --follow deployment/kafka-connect-deployment -n kafka-connect


また、k8ダッシュボードでデプロイメントを確認することもできます。OKEでのK8ダッシュボードの設定については、「Kubernetesダッシュボードを使ったクラスターへのアクセス」を参照してください。



これで、Kafka Connect ランタイムと対話し、Kafka Connect REST API を使用してコネクターを開始する準備が整いました。次のコードブロックは、ファイルソースコネクターを実行する例を示しています。

curl -i -X POST -H "Content-Type: application/json" -d "@file_connector.json" http://<ip and="" port="" of="" lb="">/connectors


file_connector.jsonの内容です。

{
    "name": "file-connector",
    "config": {
       "connector.class": "FileStreamSource",
       "tasks.max": "1",
       "topic": "from_log_file",
        "batch.size": "100",
       "file": "/kafka/logs/connect-service.log"
    }
}


Kafka Connectランタイムにバンドルされているコネクターは、ファイルの内容をトピックに移動します。ここでは、"from_log_file"という名前で、ログファイルの内容をトピックに移動するコネクタを起動します。


このコネクタを実行した後、Oracle Cloud Consoleからload messagesをクリックすることで、トピックの内容を確認することができます。





まとめ


Kafka Connectエコシステムには、データパイプラインを構築するために使用できる多くのコネクターがあります。この機能を実現するために、OCI StreamingとOKEは、Kafka Connectのランタイムを実行するためのプラットフォームを提供します。


このソリューションをご自身でお試しください Oracle Cloud Free Tierにサインアップするか、アカウントにサインインしてください。


コメント

このブログの人気の投稿

Oracle Database 19cサポート・タイムラインの重要な更新 (2024/11/20)

Oracle APEXのInteractive Gridで、Oracle Formsと比較して、重複行の検証を制御/通過させる方法 (2022/07/21)

Oracle APEX 24.1の一般提供の発表 (2024/06/17)