To use the Kafka Python client with Streaming, you must
have the following:
An Oracle Cloud Infrastructure account.
A user created in that account, in a group with a policy that grants the required
permissions. For an example of how to set up a new user, group, compartment, and policy,
see Adding Users. For a list of typical policies you
may want to use, see Common Policies.
Python 3.6 or later, with PIP installed and updated.
Visual Code Studio (recommended) or any other integrated development environment
(IDE).
Install Confluent-Kafka packages for Python using the following
command:
Copy
pip install confluent-kafka
Note
You can install these packages globally, or within a virtualenv. The librdkafka package is used by the
confluent-kafka package and embedded in wheels for the latest
confluent-kafka release. For more details, refer to the Confluent Python client documentation.
Install the SSL CA root certificates on the host where you are developing and running
this quickstart. The client uses CA certificates to verify the broker's certificate.
Authentication with the Kafka protocol uses auth tokens and the SASL/PLAIN mechanism.
Refer to Working with Auth Tokens for auth token generation. If you created the stream and stream pool in
OCI, you are already authorized to use this
stream according to OCI
IAM, so you should create auth tokens for your OCI user.
Note
OCI user auth tokens are visible only at
the time of creation. Copy it and keep it somewhere safe for future use.
Producing Messages 🔗
Open your favorite editor, such as Visual Studio Code, from the empty working
directory wd. You should already have
confluent-kafka packages for Python installed for your current
Python environment after you've met the prerequisites.
Create a file named Producer.py in the
wd directory with following code. Replace the config
values in the map conf and the name of topic is the name of
stream you created.
Copy
from confluent_kafka import Producer, KafkaError
if __name__ == '__main__':
topic = "<topic_stream_name>"
conf = {
'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section
# optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
# 3. 'ssl.ca.location': certifi.where()
'sasl.mechanism': 'PLAIN',
'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section
'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section
}
# Create Producer instance
producer = Producer(**conf)
delivered_records = 0
# Optional per-message on_delivery handler (triggered by poll() or flush())
# when a message has been successfully delivered or permanently failed delivery after retries.
def acked(err, msg):
global delivered_records
"""Delivery report handler called on
successful or failed delivery of message """
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
delivered_records += 1
print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))
for n in range(10):
record_key = "messageKey" + str(n)
record_value = "messageValue" + str(n)
print("Producing record: {}\t{}".format(record_key, record_value))
producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)
# p.poll() serves delivery reports (on_delivery) from previous produce() calls.
producer.poll(0)
producer.flush()
print("{} messages were produced to topic {}!".format(delivered_records, topic))
From the wd directory, run the following command:
Copy
python Producer.py
Use the Console to see the latest messages sent to the stream to verify
that production was successful.
Open your favorite editor, such as Visual Studio Code, from the empty working
directory wd. You should already have
confluent-kafka packages for Python installed for your current
Python environment after you've met the prerequisites.
Create a file named Consumer.py in the
wd directory with following code. Replace the config
values in the map conf and the name of topic is the name of
stream you created.
Copy
from confluent_kafka import Consumer
if __name__ == '__main__':
topic = "<topic_stream_name>"
conf = {
'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section
# optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
# 3. 'ssl.ca.location': certifi.where()
'sasl.mechanism': 'PLAIN',
'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section
'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section
}
# Create Consumer instance
consumer = Consumer(conf)
# Subscribe to topic
consumer.subscribe([topic])
# Process messages
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# No message available within timeout.
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting for message or event/error in poll()")
continue
elif msg.error():
print('error: {}'.format(msg.error()))
else:
# Check for Kafka message
record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
record_value = msg.value().decode('utf-8')
print("Consumed record with key "+ record_key + " and value " + record_value)
except KeyboardInterrupt:
pass
finally:
print("Leave group and commit final offsets")
consumer.close()
From the wd directory, run the following command:
Copy
python Consumer.py
You should see messages similar to the following:
Waiting for message or event/error in poll()
Waiting for message or event/error in poll()
Consumed record with key messageKey0 and value messageValue0
Consumed record with key messageKey1 and value messageValue1
Consumed record with key Null and value Example test message