This quickstart shows you how to use the Oracle Cloud Infrastructure (OCI) SDK for Java and Oracle Cloud Infrastructure Streaming to publish and consume messages.
Refer to the Overview of Streaming for key concepts and more Streaming details. For more information about using the OCI SDKs, see the SDK Guides.
Prerequisites
To use the SDK for Java, you must have the following:
An Oracle Cloud Infrastructure account.
A user created in that account, in a group with a policy that grants the required
permissions. The user can be yourself, or another person/system that needs to call the
API. For an example of how to set up a new user, group, compartment, and policy, see
Adding Users. For a list of typical policies you
may want to use, see Common Policies.
A key pair used for signing API requests, with the public key uploaded to Oracle. Only
the user calling the API should possess the private key.
Collect the Messages endpoint and OCID of a stream. See Listing Streams and Stream Pools for instructions on viewing stream details. For the purposes of this quickstart, the
stream should use a public endpoint and let Oracle manage encryption. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream.
JDK 8 or above installed. Ensure that Java is in your PATH.
Maven 3.0 or installed. Ensure that Maven is in your PATH.
Intellij (recommended) or any other integrated development environment (IDE).
Add the latest version of maven dependency or jar for OCI Java SDK for IAM to your
pom.xml as follows:
Open your favorite editor, such as Visual Studio Code, from the directory
wd. You should already have oci-sdk
dependencies for Java as part of the pom.xml of your Maven Java
project after you've met the prerequisites.
Create a file named Producer.java in directory
wd with following code. Replace values of variables
configurationFilePath,
profile,ociStreamOcid, and
ociMessageEndpoint in the following code snippet with the
values applicable for your tenancy.
Copy
package oci.sdk.oss.example;
import com.oracle.bmc.ConfigFileReader;
import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.streaming.StreamClient;
import com.oracle.bmc.streaming.model.PutMessagesDetails;
import com.oracle.bmc.streaming.model.PutMessagesDetailsEntry;
import com.oracle.bmc.streaming.model.PutMessagesResultEntry;
import com.oracle.bmc.streaming.requests.PutMessagesRequest;
import com.oracle.bmc.streaming.responses.PutMessagesResponse;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
import static java.nio.charset.StandardCharsets.UTF_8;
public class Producer {
public static void main(String[] args) throws Exception {
final String configurationFilePath = "<config_file_path>";
final String profile = "<config_file_profile_name>";
final String ociStreamOcid = "<stream_OCID>";
final String ociMessageEndpoint = "<stream_message_endpoint>";
final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
final AuthenticationDetailsProvider provider =
new ConfigFileAuthenticationDetailsProvider(configFile);
// Streams are assigned a specific endpoint url based on where they are provisioned.
// Create a stream client using the provided message endpoint.
StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
// publish some messages to the stream
publishExampleMessages(streamClient, ociStreamOcid);
}
private static void publishExampleMessages(StreamClient streamClient, String streamId) {
// build up a putRequest and publish some messages to the stream
List<PutMessagesDetailsEntry> messages = new ArrayList<>();
for (int i = 0; i < 50; i++) {
messages.add(
PutMessagesDetailsEntry.builder()
.key(String.format("messageKey%s", i).getBytes(UTF_8))
.value(String.format("messageValue%s", i).getBytes(UTF_8))
.build());
}
System.out.println(
String.format("Publishing %s messages to stream %s.", messages.size(), streamId));
PutMessagesDetails messagesDetails =
PutMessagesDetails.builder().messages(messages).build();
PutMessagesRequest putRequest =
PutMessagesRequest.builder()
.streamId(streamId)
.putMessagesDetails(messagesDetails)
.build();
PutMessagesResponse putResponse = streamClient.putMessages(putRequest);
// the putResponse can contain some useful metadata for handling failures
for (PutMessagesResultEntry entry : putResponse.getPutMessagesResult().getEntries()) {
if (StringUtils.isNotBlank(entry.getError())) {
System.out.println(
String.format("Error(%s): %s", entry.getError(), entry.getErrorMessage()));
} else {
System.out.println(
String.format(
"Published message to partition %s, offset %s.",
entry.getPartition(),
entry.getOffset()));
}
}
}
}
Open your favorite editor, such as Visual Studio Code, from the directory
wd. You should already have oci-sdk dependencies for
Java as part of the pom.xml of your Maven Java project after you've met
the prerequisites.
Create a file named Consumer.java in directory
wd with following code. Replace values of variables
configurationFilePath,
profile,ociStreamOcid, and
ociMessageEndpoint in the following code snippet with the values
applicable for your tenancy.
Copy
package oci.sdk.oss.example;
import com.google.common.util.concurrent.Uninterruptibles;
import com.oracle.bmc.ConfigFileReader;
import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.streaming.StreamClient;
import com.oracle.bmc.streaming.model.CreateGroupCursorDetails;
import com.oracle.bmc.streaming.model.Message;
import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest;
import com.oracle.bmc.streaming.requests.GetMessagesRequest;
import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse;
import com.oracle.bmc.streaming.responses.GetMessagesResponse;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
public class Consumer {
public static void main(String[] args) throws Exception {
final String configurationFilePath = "<config_file_path>";
final String profile = "<config_file_profile_name>";
final String ociStreamOcid = "<stream_OCID>";
final String ociMessageEndpoint = "<stream_message_endpoint>";
final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
final AuthenticationDetailsProvider provider =
new ConfigFileAuthenticationDetailsProvider(configFile);
// Streams are assigned a specific endpoint url based on where they are provisioned.
// Create a stream client using the provided message endpoint.
StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
// A cursor can be created as part of a consumer group.
// Committed offsets are managed for the group, and partitions
// are dynamically balanced amongst consumers in the group.
System.out.println("Starting a simple message loop with a group cursor");
String groupCursor =
getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
simpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
}
private static void simpleMessageLoop(
StreamClient streamClient, String streamId, String initialCursor) {
String cursor = initialCursor;
for (int i = 0; i < 10; i++) {
GetMessagesRequest getRequest =
GetMessagesRequest.builder()
.streamId(streamId)
.cursor(cursor)
.limit(25)
.build();
GetMessagesResponse getResponse = streamClient.getMessages(getRequest);
// process the messages
System.out.println(String.format("Read %s messages.", getResponse.getItems().size()));
for (Message message : ((GetMessagesResponse) getResponse).getItems()) {
System.out.println(
String.format(
"%s: %s",
message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8),
new String(message.getValue(), UTF_8)));
}
// getMessages is a throttled method; clients should retrieve sufficiently large message
// batches, as to avoid too many http requests.
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
// use the next-cursor for iteration
cursor = getResponse.getOpcNextCursor();
}
}
private static String getCursorByGroup(
StreamClient streamClient, String streamId, String groupName, String instanceName) {
System.out.println(
String.format(
"Creating a cursor for group %s, instance %s.", groupName, instanceName));
CreateGroupCursorDetails cursorDetails =
CreateGroupCursorDetails.builder()
.groupName(groupName)
.instanceName(instanceName)
.type(CreateGroupCursorDetails.Type.TrimHorizon)
.commitOnGet(true)
.build();
CreateGroupCursorRequest createCursorRequest =
CreateGroupCursorRequest.builder()
.streamId(streamId)
.createGroupCursorDetails(cursorDetails)
.build();
CreateGroupCursorResponse groupCursorResponse =
streamClient.createGroupCursor(createCursorRequest);
return groupCursorResponse.getCursor().getValue();
}
}
Starting a simple message loop with a group cursor
Creating a cursor for group exampleGroup, instance exampleInstance-1.
Read 25 messages.
Null: Example Test Message 0
Null: Example Test Message 0
Read 2 messages
Null: Example Test Message 0
Null: Example Test Message 0
Read 1 messages
Null: Example Test Message 0
Read 10 messages
key 0: value 0
key 1: value 1