Confluentスキーマ・レジストリからビッグ・データ・サービス・スキーマ・レジストリへの移行

ビッグ・データ・サービス・スキーマ・レジストリには、V1とV2という2つの異なるバージョンのAPIが用意されています。

これらのAPIは、スキーマ・メタデータとスキーマ・バージョンおよびそのライフサイクルの管理に使用されます。

ビッグ・データ・サービスでは、スキーマ・バージョンIDが自動生成されます。したがって、スキーマ・テキストが様々なスキーマ間で同じままの場合でも、生成されるIDは常に異なります。

:

{ 
 "id": 1,
  "name": "sample-topic-1",
  "schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}

{
  "id": 2,
  "name": "sample-topic-2",
  "schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}

schemaTextは同じですが、IDは異なります。ただし、Confluent Schema Registryから移行する場合、重複するschemaTextのIDは同じになります。

この問題を解決するために、schemaTextがスキーマ間で同じである場合、同じIDを保持するために2つのV2 APIを使用できます。

SERDEの使用

SERDEでは、次のV2 APIが内部的に使用されます。

KafkaAvroSerializerV2 and KafkaAvroDeserializerV2
  • これらのSERDEクラスを使用するには、

    レジストリ・クライアントを使用してJAAS構成ファイルを作成します。

    RegistryClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true
            keyTab="/path/to/registry/keytab" storeKey=true useTicketCache=false
            principal="principal@realm"; };
  • Javaコードで、Kafkaプロデューサ構成に次を追加します:
    import com.hortonworks.registries.schemaregistry.v2.serdes.avro.kafka.KafkaAvroSerializerV2;
    ..
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v2");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializerV2.class.getName());

スキーマ・レジストリ・クライアントの使用

スキーマ・レジストリ・クライアントをJAVAコードで使用して新しいスキーマ・バージョンを追加するV2 APIの使用例:

System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
Map<String, Object> config = new HashMap<>();
config.put("schema.registry.url", "http://<registry host name>:9093/api/v2");
String topicName = "my-topic";
SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
 
try {
    SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
        .type(AvroSchemaProvider.TYPE)
        .schemaGroup("sample-group")
        .description("Sample schema")
        .compatibility(SchemaCompatibility.BACKWARD)
        .build();
    SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersionV2(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
    LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
} catch (SchemaNotFoundException e) {
    LOG.info("Schema addition failed for topic {}", topicName);
    throw e;
}}