Publish and consume messages in the Streaming service using OCI SDK for .NET.
This quickstart shows you how to use the Oracle Cloud Infrastructure (OCI) SDK for .NET and Oracle Cloud Infrastructure Streaming to publish and consume messages. These examples use C# language.
For key concepts and more Streaming details, see Overview of Streaming. For more information about using the OCI SDKs, see the SDK Guides.
Prerequisites
Note
In this quickstart, we create and run a simple .NET console application by using Visual
Studio Code and the .NET CLI. Project tasks, such as creating, compiling, and running a
project are done by using the .NET CLI. If you prefer, you can follow this tutorial with a
different IDE and run commands in a terminal.
To use the SDK for .NET, you must have the following:
An Oracle Cloud Infrastructure account.
A user created in that account, in a group with a policy that grants the required
permissions. This user can be yourself, or another person/system that needs to call the
API. For an example of how to set up a new user, group, compartment, and policy, see
Adding Users. For a list of typical policies you
may want to use, see Common Policies.
A key pair used for signing API requests, with the public key uploaded to Oracle. Only
the user calling the API should possess the private key. For more information, see SDK configuration file.
Collect the Messages endpoint and OCID of a stream. For steps to get details for a stream, see Getting Details for a Stream. For the purposes of this quickstart, the stream should use a public endpoint and let Oracle manage encryption. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream.
Install .NET 5.0 SDK or later. Ensure that dotnet is set in your
PATH environment variable.
Visual Studio Code (recommended) with the C# extension installed. For information about how
to install extensions on Visual Studio Code, see VS Code Extension Marketplace.
Open your favorite editor, such as Visual Studio Code, from the empty working directory
wd.
Open the terminal and cd into the wd directory.
Create a C# .NET console application by running the following command in the
terminal:
Copy
dotnet new console
You should see a message indicating that the application was created:
The template "Console Application" was created successfully.
This creates a Program.cs file with C# code for a simple
"HelloWorld" application.
Add OCI SDK packages for basic IAM
authentication and Streaming to your C# project as
follows:
Copy
dotnet add package OCI.DotNetSDK.Common
Copy
dotnet add package OCI.DotNetSDK.Streaming
Replace the code in Program.cs in the wd
directory with following code. Replace values of variables
configurationFilePath,
profile,ociStreamOcid, and
ociMessageEndpoint in the following code snippet with the values
applicable for your tenancy.
Copy
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Oci.Common.Auth;
using Oci.Common.Waiters;
using Oci.StreamingService;
using Oci.StreamingService.Models;
using Oci.StreamingService.Requests;
using Oci.StreamingService.Responses;
namespace OssProducer
{
class Program
{
public static async Task Main(string[] args)
{
Console.WriteLine("Starting example for OSS Producer");
string configurationFilePath = "<config_file_path>";
string profile = "<config_file_profile_name>";
string ociStreamOcid = "<stream_OCID>";
string ociMessageEndpoint = "<stream_message_endpoint>";
try
{
var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
StreamClient streamClient = new StreamClient(provider);
streamClient.SetEndpoint(ociMessageEndpoint);
await PublishExampleMessages(streamClient, ociStreamOcid);
}
catch (Exception e)
{
Console.WriteLine($"Streaming example failed: {e}");
}
}
private static async Task PublishExampleMessages(StreamClient streamClient, string streamId)
{
// build up a putRequest and publish some messages to the stream
List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>();
for (int i = 0; i < 100; i++)
{
PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry
{
Key = Encoding.UTF8.GetBytes($"messagekey-{i}"),
Value = Encoding.UTF8.GetBytes($"messageValue-{i}")
};
messages.Add(detailsEntry);
}
Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}");
PutMessagesDetails messagesDetails = new PutMessagesDetails
{
Messages = messages
};
PutMessagesRequest putRequest = new PutMessagesRequest
{
StreamId = streamId,
PutMessagesDetails = messagesDetails
};
PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest);
// the putResponse can contain some useful metadata for handling failures
foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries)
{
if (entry.Error != null)
{
Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}");
}
else
{
Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}");
}
}
}
}
}
Open your favorite editor, such as Visual Studio Code, from the empty working directory
wd.
Create a C# .NET console application by running the following command on the
terminal:
Copy
dotnet new console
You should see a message indicating that the application was created:
The template "Console Application" was created successfully.
This creates a Program.cs file with C# code for a simple
"HelloWorld" application.
Add OCI SDK packages for basic IAM
authentication and Streaming to your C# project as
follows:
Copy
dotnet add package OCI.DotNetSDK.Common
Copy
dotnet add package OCI.DotNetSDK.Streaming
Replace the code in Program.cs in the wd
directory with following code. Replace values of variables
configurationFilePath,
profile,ociStreamOcid, and
ociMessageEndpoint in the following code snippet with the values
applicable for your tenancy.
Copy
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Oci.Common.Auth;
using Oci.Common.Waiters;
using Oci.StreamingService;
using Oci.StreamingService.Models;
using Oci.StreamingService.Requests;
using Oci.StreamingService.Responses;
namespace OssConsumer
{
class Program
{
public static async Task Main(string[] args)
{
Console.WriteLine("Starting example for OSS Consumer");
string configurationFilePath = "<config_file_path>";
string profile = "<config_file_profile_name>";
string ociStreamOcid = "<stream_OCID>";
string ociMessageEndpoint = "<stream_message_endpoint>";
try
{
var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile);
StreamClient streamClient = new StreamClient(provider);
streamClient.SetEndpoint(ociMessageEndpoint);
// A cursor can be created as part of a consumer group.
// Committed offsets are managed for the group, and partitions
// are dynamically balanced amongst consumers in the group.
Console.WriteLine("Starting a simple message loop with a group cursor");
string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
}
catch (Exception e)
{
Console.WriteLine($"Streaming example failed: {e}");
}
}
private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName)
{
Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}");
CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails
{
GroupName = groupName,
InstanceName = instanceName,
Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon,
CommitOnGet = true
};
CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest
{
StreamId = streamId,
CreateGroupCursorDetails = createGroupCursorDetails
};
CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest);
return groupCursorResponse.Cursor.Value;
}
private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor)
{
string cursor = initialCursor;
for (int i = 0; i < 10; i++)
{
GetMessagesRequest getMessagesRequest = new GetMessagesRequest
{
StreamId = streamId,
Cursor = cursor,
Limit = 10
};
GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest);
// process the messages
Console.WriteLine($"Read {getResponse.Items.Count}");
foreach (Message message in getResponse.Items)
{
string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null";
Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}");
}
// getMessages is a throttled method; clients should retrieve sufficiently large message
// batches, as to avoid too many http requests.
await Task.Delay(1000);
// use the next-cursor for iteration
cursor = getResponse.OpcNextCursor;
}
}
}
}
From the wd directory, run the following command:
Copy
dotnet run
You should see messages similar to the following:
Starting example for OSS Consumer
Starting a simple message loop with a group cursor
Creating a cursor for group exampleGroup, instance exampleInstance-1
Read 10
messagekey-0 : messageValue-0
messagekey-1 : messageValue-1
messagekey-2 : messageValue-2
messagekey-3 : messageValue-3
messagekey-4 : messageValue-4
messagekey-5 : messageValue-5
messagekey-6 : messageValue-6
messagekey-7 : messageValue-7
messagekey-8 : messageValue-8
messagekey-9 : messageValue-9
Read 10