This quickstart shows you how to use the Oracle Cloud Infrastructure (OCI) SDK for Python and Oracle Cloud Infrastructure Streaming to publish and consume messages.
Refer to the Overview of Streaming for key concepts and more Streaming details. For more information about using the OCI SDKs, see the SDK Guides.
Prerequisites
To use the SDK for Python, you must have the following:
An Oracle Cloud Infrastructure account.
A user created in that account, in a group with a policy that grants the required
permissions. This user can be yourself, or another person/system that needs to call the
API. For an example of how to set up a new user, group, compartment, and policy, see
Adding Users. For a list of typical policies you
may want to use, see Common Policies.
A key pair used for signing API requests, with the public key uploaded to Oracle. Only
the user calling the API should possess the private key.
Collect the Messages endpoint and OCID of a stream. See Listing Streams and Stream Pools for instructions on viewing stream details. For the purposes of this quickstart, the
stream should use a public endpoint and let Oracle manage encryption. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream.
Python 3.6 or later, with PIP installed and updated.
Visual Code Studio (recommended) or any other integrated development environment
(IDE).
Install oci-sdk packages for Python using the following command:
Copy
pip install oci
Note
We recommend that you use a Python virtual environment when installing
oci. See Downloading and Installing the SDK for more
information.
Open your favorite editor, such as Visual Studio Code, from the directory
wd. You should already have oci-sdk packages for
Python installed for your current Python environment after you've met the prerequisites.
Create a file named Producer.py in the wd
directory with following code. Replace values of variables
ociConfigFilePath,
ociProfileName,ociStreamOcid, and
ociMessageEndpoint in the following code snippet with the values
applicable for your tenancy.
Copy
import oci
from base64 import b64encode
ociMessageEndpoint = "<stream_message_endpoint>"
ociStreamOcid = "<stream_OCID>"
ociConfigFilePath = "<config_file_path>"
ociProfileName = "<config_file_profile_name>"
def produce_messages(client, stream_id):
# Build up a PutMessagesDetails and publish some messages to the stream
message_list = []
for i in range(100):
key = "messageKey" + str(i)
value = "messageValue " + str(i)
encoded_key = b64encode(key.encode()).decode()
encoded_value = b64encode(value.encode()).decode()
message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))
print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
put_message_result = client.put_messages(stream_id, messages)
# The put_message_result can contain some useful metadata for handling failures
for entry in put_message_result.data.entries:
if entry.error:
print("Error ({}) : {}".format(entry.error, entry.error_message))
else:
print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))
config = oci.config.from_file(ociConfigFilePath, ociProfileName)
stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
# Publish some messages to the stream
produce_messages(stream_client, ociStreamOcid)
From the wd directory, run the following command:
Copy
python Producer.py
Use
the Console to see the latest messages sent to the stream to verify that production
was successful.
Open your favorite editor, such as Visual Studio Code, from the directory
wd. You should already have oci-sdk packages for
Python installed for your current Python environment after ensuring you have the prerequisites.
Create a file named Consumer.py in directory wd
with following code. Replace values of variables ociConfigFilePath,
ociProfileName,ociStreamOcid, and
ociMessageEndpoint in the following code snippet with the values
applicable for your tenancy.
Copy
import oci
import time
from base64 import b64decode
ociMessageEndpoint = "<stream_message_endpoint>"
ociStreamOcid = "<stream_OCID>"
ociConfigFilePath = "<config_file_path>"
ociProfileName = "<config_file_profile_name>"
def get_cursor_by_group(sc, sid, group_name, instance_name):
print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name))
cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name,
type=oci.streaming.models.
CreateGroupCursorDetails.TYPE_TRIM_HORIZON,
commit_on_get=True)
response = sc.create_group_cursor(sid, cursor_details)
return response.data.value
def simple_message_loop(client, stream_id, initial_cursor):
cursor = initial_cursor
while True:
get_response = client.get_messages(stream_id, cursor, limit=10)
# No messages to process. return.
if not get_response.data:
return
# Process the messages
print(" Read {} messages".format(len(get_response.data)))
for message in get_response.data:
if message.key is None:
key = "Null"
else:
key = b64decode(message.key.encode()).decode()
print("{}: {}".format(key,
b64decode(message.value.encode()).decode()))
# get_messages is a throttled method; clients should retrieve sufficiently large message
# batches, as to avoid too many http requests.
time.sleep(1)
# use the next-cursor for iteration
cursor = get_response.headers["opc-next-cursor"]
config = oci.config.from_file(ociConfigFilePath, ociProfileName)
stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
# A cursor can be created as part of a consumer group.
# Committed offsets are managed for the group, and partitions
# are dynamically balanced amongst consumers in the group.
group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1")
simple_message_loop(stream_client, ociStreamOcid, group_cursor)
From the wd directory, run the following command:
Copy
python Consumer.py
You should see messages similar to the following:
Starting a simple message loop with a group cursor
Creating a cursor for group example-group, instance example-instance-1
Read 2 messages
Null: Example Test Message 0
Null: Example Test Message 0
Read 2 messages
Null: Example Test Message 0
Null: Example Test Message 0
Read 1 messages
Null: Example Test Message 0
Read 10 messages
key 0: value 0
key 1: value 1