スキーマ・レジストリとApache Kafkaの統合

スキーマ・レジストリは、指定されたサブジェクト名に基づいてすべてのスキーマのバージョン管理された履歴を格納し、多くの互換性設定を提供し、時間の経過とともにスキーマを進化させることができます。Schema Registryは、Kafkaブローカ外およびKafkaブローカとは別に存在します。

スキーマ・レジストリには、次の3つの主要コンポーネントがあります。

  • レジストリWebサーバー: スキーマ・エンティティの管理に使用されるRESTエンドポイントを公開するWebアプリケーション。Webプロキシおよびロード・バランサを多くのWebサーバーとともに使用して、HAおよびスケーラビリティを提供します。スキーマ・レジストリHAを有効にするには、スキーマ・レジストリHAを参照してください。
  • プラガブル・ストレージ/スキーマ・メタデータ・ストレージ: スキーマ・エンティティのメタデータを保持するリレーショナル・ストア。インメモリー・ストレージおよびMySQLデータベースがサポートされています。
  • Serdes Storage: シリアライザおよびデシリアライザjar用のファイル・ストレージ。ローカル・ファイル・システムおよびHDFS記憶域がサポートされています。デフォルトは、ローカルのファイルシステムストレージです。

スキーマ・レジストリの主なコンポーネントは次のとおりです。

  • レジストリWebサーバー: スキーマ・エンティティの管理に使用されるRESTエンドポイントを公開するWebアプリケーション。Webプロキシおよび多数のWebサーバーを含むLoad Balancerを使用して、HAおよびスケーラビリティを提供します。スキーマ・レジストリHAを参照してください。
  • プラガブル・ストレージのスキーマ・メタデータ・ストレージ: スキーマ・エンティティのメタデータを保持するリレーショナル・ストア。インメモリー・ストレージおよびMySQLデータベースがサポートされています。
  • Serdes Storage: シリアライザおよびデシリアライザjar用のファイル・ストレージ。ローカル・ファイル・システムおよびHDFS記憶域がサポートされています。デフォルトは、ローカルのファイルシステムストレージです。

スキーマ・レジストリとの相互作用

スキーマ・レジストリ・クライアントを使用して、スキーマ・レジストリ・サーバーと直接対話し、スキーマにアクセスまたは変更します。
ノート

JAAS構成の設定は、Kerberos対応クラスタ(HAクラスタ)にのみ必要です。
  1. レジストリ・クライアントを使用してJAAS構成ファイルを作成します。
    RegistryClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab="/path/to/registry/keytab"
      storeKey=true
      useTicketCache=false
      principal="principal@realm";
    };
  2. Javaコードで、Kafkaプロデューサ構成に次を追加します:
    import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
    ..
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
  3. クライアント構成でenv変数を次のように設定して、スキーマ・レジストリ・クライアントを作成します。
    System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
    
    Map<String, Object> config = new HashMap<>();
    config.put("schema.registry.url", "http://<registry host name>:9093/api/v1");
    
    String topicName = "my-topic";
    SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
    
    try {
        SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
            .type(AvroSchemaProvider.TYPE)
            .schemaGroup("sample-group")
            .description("Sample schema")
            .compatibility(SchemaCompatibility.BACKWARD)
            .build();
        SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersion(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
        LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
    } catch (SchemaNotFoundException e) {
        LOG.info("Schema addition failed for topic {}", topicName);
        throw e;
    }}