Confluentスキーマ・レジストリからビッグ・データ・サービス・スキーマ・レジストリへの移行
ビッグ・データ・サービス・スキーマ・レジストリには、V1とV2という2つの異なるバージョンのAPIが用意されています。
これらのAPIは、スキーマ・メタデータとスキーマ・バージョンおよびそのライフサイクルの管理に使用されます。
ビッグ・データ・サービスでは、スキーマ・バージョンIDが自動生成されます。したがって、スキーマ・テキストが様々なスキーマ間で同じままの場合でも、生成されるIDは常に異なります。
例:
{
"id": 1,
"name": "sample-topic-1",
"schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}
{
"id": 2,
"name": "sample-topic-2",
"schemaText": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}",
}
schemaTextは同じですが、IDは異なります。ただし、Confluent Schema Registryから移行する場合、重複するschemaTextのIDは同じになります。
この問題を解決するために、schemaTextがスキーマ間で同じである場合、同じIDを保持するために2つのV2 APIを使用できます。
SERDEの使用
SERDEでは、次のV2 APIが内部的に使用されます。
KafkaAvroSerializerV2 and KafkaAvroDeserializerV2
-
これらのSERDEクラスを使用するには、
レジストリ・クライアントを使用してJAAS構成ファイルを作成します。
RegistryClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/path/to/registry/keytab" storeKey=true useTicketCache=false principal="principal@realm"; };
- Javaコードで、Kafkaプロデューサ構成に次を追加します:
import com.hortonworks.registries.schemaregistry.v2.serdes.avro.kafka.KafkaAvroSerializerV2; .. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.setProperty("schema.registry.url", "http://<host name>:9093/api/v2"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializerV2.class.getName());
スキーマ・レジストリ・クライアントの使用
スキーマ・レジストリ・クライアントをJAVAコードで使用して新しいスキーマ・バージョンを追加するV2 APIの使用例:
System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
Map<String, Object> config = new HashMap<>();
config.put("schema.registry.url", "http://<registry host name>:9093/api/v2");
String topicName = "my-topic";
SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
try {
SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
.type(AvroSchemaProvider.TYPE)
.schemaGroup("sample-group")
.description("Sample schema")
.compatibility(SchemaCompatibility.BACKWARD)
.build();
SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersionV2(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
} catch (SchemaNotFoundException e) {
LOG.info("Schema addition failed for topic {}", topicName);
throw e;
}}