スキーマ・レジストリの使用

スキーマ・レジストリは、アプリケーションが相互に柔軟に対話できるようにするスキーマの共有リポジトリを提供します。

多くの場合、構築されたアプリケーションは、次の3つのディメンション間でメタデータを共有する方法を必要とします。

  • データ形式

  • スキーマ

  • データのセマンティクスまたは意味

スキーマ・レジストリが提供する値と、スキーマ・レジストリと統合するアプリケーションは次のとおりです。

  • 一元化されたレジストリ: すべてのデータにスキーマをアタッチしないように、再利用可能なスキーマを提供します。

  • バージョン管理: コンシューマとプロデューサが異なるレートで進化できるように、スキーマ・バージョン間の関係を定義します。

  • スキーマ検証: 汎用フォーマット変換、汎用ルーティングおよびデータ品質を有効にします。

なぜスキーマ・レジストリ?

Kafkaの分離された性質のため、プロデューサとコンシューマは直接通信しません。かわりに、情報転送はKafkaトピックを介して行われます。同時に、コンシューマは、プロデューサがデシリアライズするために送信するデータのタイプを知る必要があります。プロデューサが不正なデータのKafkaへの送信を開始した場合、またはデータ型が変更された場合、ダウンストリーム・コンシューマは中断し始めます。合意された共通のデータ型を持つ方法が必要です。

ここにスキーマ・レジストリがあります。スキーマ・レジストリは、Kafkaクラスタの外部に存在するアプリケーションで、スキーマのコピーをローカル・キャッシュに格納することで、スキーマのプロデューサおよびコンシューマへの分散を処理します。

使用例

Schema Registryには、クライアント・アプリケーションでスキーマを一元的に使用できるようにするためのRest APIのセット(Webインタフェースを使用)が用意されています。一般的なユースケースは次のとおりです。

  1. Kafkaトピックのスキーマの登録および問合せ

    Kafkaがエンタープライズ組織デプロイメントに統合されている場合、通常、様々なアプリケーションおよびユーザーが使用する多くの異なるKafkaトピックがあります。

  2. Kafkaトピックからのデータの読取り/デシリアライズおよび書込み/シリアライズ

    また、スキーマ・メタデータの格納に加えて、データの読取り方法および書込み方法の書式のメタデータを格納する重要なユースケースもあります。スキーマ・レジストリでは、シリアライザおよびデシリアライザのJARファイルを格納し、その血清をスキーマにマップする機能を提供することで、このユースケースをサポートしています。

  3. スキーマベースのルーティングを使用したデータフロー管理

    NiFiを使用して様々なタイプのsyslogイベントをダウンストリーム・システムに移動する場合、syslogイベントを解析してイベント・タイプを抽出し、イベント・タイプに基づいて特定のダウンストリーム・システム(異なるKafkaトピックなど)にイベントをルーティングする必要があるデータ移動要件があります。

  4. 動的スキーマの進化により、スキーマは一定期間にわたって進化できます。

    スキーマ内の更新によって、コンシューマに通信される新しいリビジョンが作成されます。

  5. データ品質のために、コンシューマ側からのスキーマの強制/検証をシームレスに行うことができます。

Javaクライアントを使用したKafkaプロデューサ/コンシューマの起動

  1. pom.xmlで、Kafkaプロデューサ構成に次を追加します:
    import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
    ..
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v1");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

    フォーカスする構成は次のとおりです。

    • schema.registry.url
    • key.serializer
    • value.serializer
  2. Kafkaコンシューマ構成に次のテキストを追加します:
    import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
    ..
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v1");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
  3. スキーマ・レジストリの起動後、Web UI (http://<FQDN>:9093)にアクセスします。

ODHクラスタへのApacheスキーマ・レジストリの追加

Ambariを使用して、Apacheスキーマ・レジストリをビッグ・データ・サービス・クラスタに追加できます

スキーマ・レジストリは、デフォルトではビッグ・データ・サービス・クラスタにインストールされません。スキーマ・レジストリでは、スキーマ・メタデータを格納するための2つのタイプのストレージがサポートされています。

  • Mysql: 設定される値: mysql
  • メモリー内: 設定される値: inmemory

適切な記憶域タイプを設定するには、Ambari UIの「スキーマ・レジストリ」セクションの「拡張」でStorage Typeパラメータを設定します。

Mysqlを記憶域タイプとして設定するには、サーバーを起動する前に複数の表を作成する必要があります。これは、Storage Typeパラメータが mysqlに設定されているときに自動的に行われます。

ノート

スキーマ・レジストリ・サービスのインストール時には、MySql Passowrdパラメータは必須です。このパスワードは、Mysql表に格納されているスキーマ情報にアクセスするために使用されます。

ビッグ・データ・サービス3.0.28以前のクラスタの場合、次のコマンドを実行して、MySql Passowrdをun0ノードから手動でリセットする必要があります:

sudo update_mysql_password registry <password>
                            

スキーマ・レジストリ・サービスのインストール時にこのパスワードを指定します。

  1. Apache Ambariにアクセスします。
  2. 「サービス」メニューから「サービスの追加」を選択します。
  3. サービスの追加ウィザードで、「スキーム・レジストリ」を選択し、「次へ」を選択します。
  4. 次へ
  5. Advanced schema-registryパラメータの「サービスのカスタマイズ」ページで、「Mysqlパスワード」を適切な値に更新します。

    これにより、レジストリMySQLデータベースのパスワードがリセットされます。

  6. 「次へ」を選択します。
  7. 構成を確認し、「デプロイ」を選択します。
  8. インストールが完了するまで待ってから、「次へ」を選択します。
  9. 「完了」を選択します。
  10. 「サービス」メニューから必要なサービスをすべて再起動します。
  11. インストールが成功したことを確認するには、Apache Ambariダッシュボードからスキーマ・レジストリにアクセスし、「アクション」を選択します。
  12. 「サービス・チェックの実行」を選択します。