This quickstart shows you how to use the Oracle Cloud Infrastructure (OCI) SDK for TypeScript and JavaScript and Oracle Cloud Infrastructure Streaming to publish and consume messages.
Refer to the Overview of Streaming for key concepts and more Streaming details. For more information about using the OCI SDKs, see the SDK Guides.
Prerequisites
To use the SDK for TypeScript and JavaScript, you must have the following:
An Oracle Cloud Infrastructure account.
A user created in that account, in a group with a policy that grants the required
permissions. This user can be yourself, or another person/system that needs to call the
API. For an example of how to set up a new user, group, compartment, and policy, see
Adding Users. For a list of typical policies you
may want to use, see Common Policies.
A key pair used for signing API requests, with the public key uploaded to Oracle. Only the user calling the API should possess the private key. For more information, see Getting Started.
Collect the Messages endpoint and OCID of a stream. See Listing Streams and Stream Pools for instructions on viewing stream details. For the purposes of this quickstart, the
stream should use a public endpoint and let Oracle manage encryption. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream.
Visual Code Studio (recommended) or any other integrated development environment
(IDE).
Open a command prompt that has npm in its path, change to the directory
where you want to keep your code for this quickstart (wd, for
example), and then run the following command to install the OCI SDK for JavaScript:
Open your favorite editor, such as Visual Studio Code, from the directory
wd. You should already have oci-sdk packages for
JavaScript installed in this directory after you've met the prerequisites.
Create a file named Producer.js in the wd
directory with following code. Replace values of variables ociConfigFile,
ociProfileName,ociStreamOcid, and
ociMessageEndpointForStream in the following code snippet with the
values applicable for your tenancy.
Copy
const common = require("oci-common");
const st = require("oci-streaming"); // OCI SDK package for OSS
const ociConfigFile = "<config_file_path>";
const ociProfileName = "<config_file_profile_name>";
const ociMessageEndpointForStream = "<stream_message_endpoint>";
const ociStreamOcid = "<stream_OCID>";
// provide authentication for OCI and OSS
const provider = new common.ConfigFileAuthenticationDetailsProvider(ociConfigFile, ociProfileName);
async function main() {
// OSS client to produce and consume messages from a Stream in OSS
const client = new st.StreamClient({ authenticationDetailsProvider: provider });
client.endpoint = ociMessageEndpointForStream;
// build up a putRequest and publish some messages to the stream
let messages = [];
for (let i = 1; i <= 3; i++) {
let entry = {
key: Buffer.from("messageKey" + i).toString("base64"),
value: Buffer.from("messageValue" + i).toString("base64")
};
messages.push(entry);
}
console.log("Publishing %s messages to stream %s.", messages.length, ociStreamOcid);
const putMessageDetails = { messages: messages };
const putMessagesRequest = {
putMessagesDetails: putMessageDetails,
streamId: ociStreamOcid
};
const putMessageResponse = await client.putMessages(putMessagesRequest);
for (var entry of putMessageResponse.putMessagesResult.entries)
console.log("Published messages to parition %s, offset %s", entry.partition, entry.offset);
}
main().catch((err) => {
console.log("Error occurred: ", err);
});
From the wd directory, run the following command:
Copy
node run Producer.js
Use
the Console to see the latest messages sent to the stream to verify that production
was successful.
Open your favorite editor, such as Visual Studio Code, from the directory
wd. You should already have the oci-sdk packages for
JavaScript installed in this directory after you've met the prerequisites.
Create a file named Consumer.js in directory wd
with following code. Replace values of variables ociConfigFile,
ociProfileName,ociStreamOcid, and
ociMessageEndpointForStream in the following code snippet with the
values applicable for your tenancy.
Copy
const common = require("oci-common");
const st = require("oci-streaming"); // OCI SDK package for OSS
const ociConfigFile = "<config_file_path>";
const ociProfileName = "<config_file_profile_name>";
const ociMessageEndpointForStream = "<stream_message_endpoint>";
const ociStreamOcid = "<stream_OCID>";
// provide authentication for OCI and OSS
const provider = new common.ConfigFileAuthenticationDetailsProvider(ociConfigFile, ociProfileName);
const consumerGroupName = "exampleGroup";
const consumerGroupInstanceName = "exampleInstance-1";
async function main() {
// OSS client to produce and consume messages from a Stream in OSS
const client = new st.StreamClient({ authenticationDetailsProvider: provider });
client.endpoint = ociMessageEndpointForStream;
// A cursor can be created as part of a consumer group.
// Committed offsets are managed for the group, and partitions
// are dynamically balanced amongst consumers in the group.
console.log("Starting a simple message loop with a group cursor");
const groupCursor = await getCursorByGroup(client, ociStreamOcid, consumerGroupName, consumerGroupInstanceName);
await consumerMsgLoop(client, ociStreamOcid, groupCursor);
}
main().catch((err) => {
console.log("Error occurred: ", err);
});
async function consumerMsgLoop(client, streamId, initialCursor) {
let cursor = initialCursor;
for (var i = 0; i < 10; i++) {
const getRequest = {
streamId: streamId,
cursor: cursor,
limit: 2
};
const response = await client.getMessages(getRequest);
console.log("Read %s messages.", response.items.length);
for (var message of response.items) {
if (message.key !== null) {
console.log("%s: %s",
Buffer.from(message.key, "base64").toString(),
Buffer.from(message.value, "base64").toString());
}
else{
console.log("Null: %s",
Buffer.from(message.value, "base64").toString() );
}
}
// getMessages is a throttled method; clients should retrieve sufficiently large message
// batches, as to avoid too many http requests.
await delay(2);
cursor = response.opcNextCursor;
}
}
async function getCursorByGroup(client, streamId, groupName, instanceName) {
console.log("Creating a cursor for group %s, instance %s.", groupName, instanceName);
const cursorDetails = {
groupName: groupName,
instanceName: instanceName,
type: st.models.CreateGroupCursorDetails.Type.TrimHorizon,
commitOnGet: true
};
const createCursorRequest = {
createGroupCursorDetails: cursorDetails,
streamId: streamId
};
const response = await client.createGroupCursor(createCursorRequest);
return response.cursor.value;
}
async function delay(s) {
return new Promise(resolve => setTimeout(resolve, s * 1000));
}
From the wd directory, run the following command:
Copy
node run Consumer.js
You should see messages similar to the following:
Starting a simple message loop with a group cursor
Creating a cursor for group exampleGroup, instance exampleInstance-1.
Read 1 messages.
Null: Example Test Message 0
Read 1 messages.
Null: Example Test Message 0
Read 1 messages.
Null: Example Test Message 0
Read 2 messages.
Null: Example Test Message 0
Null: Example Test Message 0
Read 2 messages.
Null: Example Test Message 0
Null: Example Test Message 0