To use the Kafka Java client with Streaming, you must
have the following:
An Oracle Cloud Infrastructure account.
A user created in that account, in a group with a policy that grants the required
permissions. For an example of how to set up a new user, group, compartment, and policy,
see Adding Users. For a list of typical policies you
may want to use, see Common Policies.
Authentication with the Kafka protocol uses auth tokens and the SASL/PLAIN mechanism.
Refer to Working with Auth Tokens for auth token generation. If you created the stream and stream pool in
OCI, you are already authorized to use this
stream according to OCI
IAM, so you should create auth tokens for your OCI user.
Note
OCI user auth tokens are visible only at
the time of creation. Copy it and keep it somewhere safe for future use.
Producing Messages 🔗
Open your favorite editor, such as Visual Studio Code, from the directory
wd. You should already have the Kafka SDK dependencies for
Java as part of the pom.xml of your Maven Java project after
you've met the prerequisites.
Create a new file named Producer.java in directory
wd under the path
/src/main/java/kafka/sdk/oss/example/ with following
code. Replace the values of variables in the code as directed by the code
comments, namely bootstrapServers through
streamOrKafkaTopicName. These variables are for Kafka
connection settings which you gathered in the prerequisites.
Copy
package kafka.sdk.oss.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
static String tenancyName = "<OCI_tenancy_name>";
static String username = "<your_OCI_username>";
static String streamPoolId = "<stream_pool_OCID>";
static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
private static Properties getKafkaProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ tenancyName + "/"
+ username + "/"
+ streamPoolId + "\" "
+ "password=\""
+ authToken + "\";";
properties.put("sasl.jaas.config", value);
properties.put("retries", 3); // retries on transient errors and load balancing disconnection
properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
return properties;
}
public static void main(String args[]) {
try {
Properties properties = getKafkaProperties();
KafkaProducer producer = new KafkaProducer<>(properties);
for(int i=0;i<10;i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i);
producer.send(record, (md, ex) -> {
if (ex != null) {
System.err.println("exception occurred in producer for review :" + record.value()
+ ", exception is " + ex);
ex.printStackTrace();
} else {
System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp());
}
});
}
// producer.send() is async, to make sure all messages are sent we use producer.flush()
producer.flush();
producer.close();
} catch (Exception e) {
System.err.println("Error: exception " + e);
}
}
}
Open your favorite editor, such as Visual Studio Code, from the directory
wd under the path
/src/main/java/kafka/sdk/oss/example/. You should already
have the Kafka SDK dependencies for Java as part of the pom.xml
of your Maven Java project after you've met the prerequisites.
Create a new file named Consumer.java in directory
wd with following code. Replace the values of variables
in the code as directed by the code comments, namely
bootstrapServers through
consumerGroupName. These variables are for Kafka connection
settings which you gathered in the prerequisites.
Copy
package kafka.sdk.oss.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
static String tenancyName = "<OCI_tenancy_name>";
static String username = "<your_OCI_username>";
static String streamPoolId = "<stream_pool_OCID>";
static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
static String consumerGroupName = "<consumer_group_name>";
private static Properties getKafkaProperties(){
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", consumerGroupName);
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("auto.offset.reset", "earliest");
final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ tenancyName + "/"
+ username + "/"
+ streamPoolId + "\" "
+ "password=\""
+ authToken + "\";";
props.put("sasl.jaas.config", value);
return props;
}
public static void main(String[] args) {
final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());;
consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName));
ConsumerRecords<Integer, String> records = consumer.poll(10000);
System.out.println("size of records polled is "+ records.count());
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
consumer.commitSync();
consumer.close();
}
}
[INFO related maven compiling and building the Java code]
size of records polled is 3
Received message: (messageKey0, message value) at offset 1284
Received message: (messageKey0, message value) at offset 1285
Received message: (null, message produced using oci console) at offset 1286