Oracle Kubernetes Engine上のOracle StreamingサービスによるKafka Connect (2022/08/31)
Oracle Kubernetes Engine上のOracle StreamingサービスによるKafka Connect (2022/08/31)
投稿者:Nitin Soni | Principal Solution Architect
Kafka Connectは、外部システムからKafkaトピックへのデータの移動、またはその逆を可能にする、拡張可能なフレームワークです。Kafkaシンクまたはソースコネクターは、フレームワークが外部システムに接続するためのコンポーネントです。一般的なデータソースとシンクのために多くのソースとシンクコネクタを使用したり、カスタムユースケースのためにコネクタを実装することができます。
これらのコネクタを実行するには、KafkaブローカーとKafka Connectランタイムに依存する必要があります。これらの依存関係を管理する負担を軽減するために、KafkaブローカーについてはOracle Cloud Infrastructure(OCI)のStreamingサービスに依存することができます。OCI Streamingは、Kafka Connect構成を提供することで、Kafka Connectランタイムをサポートします。各Connect構成は、Kafka Connectランタイムが接続するための構成、ステータス、およびオフセットトピックを提供します。Kafka Connectランタイムは、Oracle Container Engine for Kubernetes(OKE)上でホストすることができます。
このブログ記事では、OKEクラスター上でOCI Streamingを使用してKafka Connectランタイムをセットアップする方法を中心に説明します。
依存関係
- OCI Streaming service
- Oracle Container Engine for Kubernetes (OKE)
- Virtual cloud network (VCN)
- Kafka ConnectランタイムDockerイメージ
Kafka Connectの設定を作成
クラウドアカウントにログインし、「Analytics」セクションで「Streaming」を選択します。Kafka Connectの設定メニューで、設定を作成するオプションを選択します。
作成後、設定、ステータス、ストレージのトピック名を使用します。Kafka ConnectのDockerイメージの設定中に使用するので、保存してください。
OKEのセットアップ
すでにK8クラスターが稼働していて、それを利用してKafka Connectランタイムをセットアップする場合は、このセクションをスキップできます。
K8 クラスターを初めてセットアップする場合は、Oracle Cloud Console の「Kubernetes Clusters (OKE)」セクションで提供される Quick Create ワークフローを使用できます。詳細については、「コンソールを使用して『Quick Create』ワークフローでデフォルト設定のクラスターを作成」を参照してください。
また、Terraformスクリプトを使用して、クラスターをすばやく起動することができます。この方法のクイックスタートは、GitHubで公開されています。
OKEでKafka Connectランタイムを設定
まず、K8でネームスペースを作成します。
kubectl create ns kafka-connect
以下の内容でプロパティファイルconnect-distributed.envを作成します。
LOG_LEVEL=INFO
BOOTSTRAP_SERVERS=cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092
GROUP_ID=grp-1
CONFIG_STORAGE_TOPIC=<connect_configuration_ocid>-config
OFFSET_STORAGE_TOPIC=<connect_configuration_ocid>-offset
CONNECT_STATUS_STORAGE_TOPIC=<connect_configuration_ocid>-status
CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
CONNECT_SECURITY_PROTOCOL=SASL_SSL
CONNECT_SASL_MECHANISM=PLAIN
CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<user>/<stream_pool_ocid>" password="<auth_code>";
CONNECT_PRODUCER_BUFFER_MEMORY=10240
CONNECT_PRODUCER_BATCH_SIZE=2048
CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tanancy>/<user>/<stream_pool_ocid>" password="<auth_code>";
CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="<teanancy>/<user>/<stream_pool_ocid>" password="<auth_code>";
次のコマンドを実行すると、「kafka-connect」名前空間に「kafka-connect-config」という名前でシークレットが作成されます。これらの環境変数は、Kafka Connect ランタイムを実行するコンテナで利用可能です。
kubectl create secret generic kafka-connect-config --from-env-file=connect-distributed.env -n kafka-connect
次の内容でデプロイメントファイル(kafka-connect.yaml)を作成します。
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-connect-deployment
spec:
selector:
matchLabels:
app: kafka-connect
replicas: 1 # tells deployment to run 1 pods matching the template
template: # create pods using pod definition in this template
metadata:
labels:
app: kafka-connect
spec:
containers:
- image: debezium/connect:1.2.4.Final
name: kafka-connect
ports:
- containerPort: 8083
envFrom:
- secretRef:
name: kafka-connect-config
---
apiVersion: v1
kind: Service
metadata:
name: kafka-connect-lb
labels:
app: kafka-connect-lb
annotations:
oci.oraclecloud.com/load-balancer-type: "lb"
spec:
type: LoadBalancer
ports:
- port: 80
protocol: TCP
targetPort: 8083
selector:
app: kafka-connect
このデプロイの詳細については、以下の点に注意してください。
- Kafka ランタイムは debezium/connect:1.2.4.Final Docker イメージを使用します。
- このデプロイメントのコンテナは、Kafka Connect ランタイム用にポート 8083 を公開します。このポートは、REST API のクエリをリッスンします。API のリファレンスは、Kafka のドキュメントを参照してください。
- コンテナ内の環境変数は、secret "kafka-connect-config"を使用して設定します。
- ロードバランサーを作成し、展開された複数のポッド間のトラフィックを分散させます。ロードバランサーのポート80とPodのポート8083をバインドします。OKE のロードバランサーの詳細については、「Defining Kubernetes Services of Type LoadBalancer」を参照してください。
K8クラスターのkafka-connectネームスペースにデプロイメントを作成します。
kubectl apply -f kafka-connect.yml -n kafka-connect
コンテナのログを確認し、ポッドが正常に起動したかどうかを確認します。
kubectl logs --follow deployment/kafka-connect-deployment -n kafka-connect
また、k8ダッシュボードでデプロイメントを確認することもできます。OKEでのK8ダッシュボードの設定については、「Kubernetesダッシュボードを使ったクラスターへのアクセス」を参照してください。
これで、Kafka Connect ランタイムと対話し、Kafka Connect REST API を使用してコネクターを開始する準備が整いました。次のコードブロックは、ファイルソースコネクターを実行する例を示しています。
curl -i -X POST -H "Content-Type: application/json" -d "@file_connector.json" http://<ip and="" port="" of="" lb="">/connectors
file_connector.jsonの内容です。
{
"name": "file-connector",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": "1",
"topic": "from_log_file",
"batch.size": "100",
"file": "/kafka/logs/connect-service.log"
}
}
Kafka Connectランタイムにバンドルされているコネクターは、ファイルの内容をトピックに移動します。ここでは、"from_log_file"という名前で、ログファイルの内容をトピックに移動するコネクタを起動します。
このコネクタを実行した後、Oracle Cloud Consoleからload messagesをクリックすることで、トピックの内容を確認することができます。
まとめ
Kafka Connectエコシステムには、データパイプラインを構築するために使用できる多くのコネクターがあります。この機能を実現するために、OCI StreamingとOKEは、Kafka Connectのランタイムを実行するためのプラットフォームを提供します。
このソリューションをご自身でお試しください Oracle Cloud Free Tierにサインアップするか、アカウントにサインインしてください。
コメント
コメントを投稿