This quickstart shows you how to use the Oracle Cloud Infrastructure (OCI) SDK for Go and Oracle Cloud Infrastructure Streaming to publish and consume messages.
Refer to the Overview of Streaming for key concepts and more Streaming details. For more information about using the OCI SDKs, see the SDK Guides.
Prerequisites
To use the SDK for Go, you must have the following:
An Oracle Cloud Infrastructure account.
A user created in that account, in a group with a policy that grants the required
permissions. This user can be yourself, or another person/system that needs to call the
API. For an example of how to set up a new user, group, compartment, and policy, see
Adding Users. For a list of typical policies you
may want to use, see Common Policies.
A key pair used for signing API requests, with the public key uploaded to Oracle. Only
the user calling the API should possess the private key. For more information, see SDK configuration file.
Collect the Messages endpoint and OCID of a stream. See Listing Streams and Stream Pools for instructions on viewing stream details. For the purposes of this quickstart, the
stream should use a public endpoint and let Oracle manage encryption. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream.
Go installed locally. Follow these instructions if necessary. Ensure that go
is in your PATH.
Visual Studio Code (recommended) or any other integrated development environment (IDE) or
text editor.
Open your favorite editor, such as Visual Studio Code, from the empty working
directory wd.
Create a file named Producer.go in this directory.
Add the following code to Producer.go. Replace values of
variables ociConfigFilePath,
ociProfileName,ociStreamOcid, and
ociMessageEndpoint in the following code snippet with the
values applicable for your tenancy.
Copy
package main
import (
"context"
"fmt"
"strconv"
"github.com/oracle/oci-go-sdk/v36/common"
"github.com/oracle/oci-go-sdk/v36/example/helpers"
"github.com/oracle/oci-go-sdk/v36/streaming"
)
const ociMessageEndpoint = "<stream_message_endpoint>"
const ociStreamOcid = "<stream_OCID>"
const ociConfigFilePath = "<config_file_path>"
const ociProfileName = "<config_file_profile_name>"
func main() {
fmt.Println("Go oci oss sdk example producer")
putMsgInStream(ociMessageEndpoint, ociStreamOcid)
}
func putMsgInStream(streamEndpoint string, streamOcid string) {
fmt.Println("Stream endpoint for put msg api is: " + streamEndpoint)
provider, err := common.ConfigurationProviderFromFileWithProfile(ociConfigFilePath, ociProfileName, "")
helpers.FatalIfError(err)
streamClient, err := streaming.NewStreamClientWithConfigurationProvider(provider, streamEndpoint)
helpers.FatalIfError(err)
// Create a request and dependent object(s).
for i := 0; i < 5; i++ {
putMsgReq := streaming.PutMessagesRequest{StreamId: common.String(streamOcid),
PutMessagesDetails: streaming.PutMessagesDetails{
// we are batching 2 messages for each Put Request
Messages: []streaming.PutMessagesDetailsEntry{
{Key: []byte("key dummy-0-" + strconv.Itoa(i)),
Value: []byte("value dummy-" + strconv.Itoa(i))},
{Key: []byte("key dummy-1-" + strconv.Itoa(i)),
Value: []byte("value dummy-" + strconv.Itoa(i))}}},
}
// Send the request using the service client
putMsgResp, err := streamClient.PutMessages(context.Background(), putMsgReq)
helpers.FatalIfError(err)
// Retrieve value from the response.
fmt.Println(putMsgResp)
}
}
Save Producer.go.
Open the terminal and cd to the wd
directory, run the following commands, in order:
This command creates the go.mod file in the
wd directory:
Copy
go mod init oss_producer_example/v0
This command installs the OCI
SDK for Go and for Streaming:
Copy
go mod tidy
This command runs the example:
Copy
go run Producer.go
Use the Console to see the latest messages sent to the stream to verify
that production was successful.
Add the following code to Consumer.go. Replace values of variables
ociConfigFilePath,
ociProfileName,ociStreamOcid, and
ociMessageEndpoint in the following code snippet with the values
applicable for your tenancy.
Copy
package main
import (
"context"
"fmt"
"github.com/oracle/oci-go-sdk/v36/common"
"github.com/oracle/oci-go-sdk/v36/example/helpers"
"github.com/oracle/oci-go-sdk/v36/streaming"
)
const ociMessageEndpoint = "<stream_message_endpoint>"
const ociStreamOcid = "<stream_OCID>"
const ociConfigFilePath = "<config_file_path>"
const ociProfileName = "<config_file_profile_name>"
func main() {
fmt.Println("Go oci oss sdk example for consumer")
getMsgWithGroupCursor(ociMessageEndpoint, ociStreamOcid)
}
func getMsgWithGroupCursor(streamEndpoint string, streamOcid string) {
client, err := streaming.NewStreamClientWithConfigurationProvider(common.DefaultConfigProvider(), streamEndpoint)
helpers.FatalIfError(err)
grpCursorCreateReq0 := streaming.CreateGroupCursorRequest{
StreamId: common.String(streamOcid),
CreateGroupCursorDetails: streaming.CreateGroupCursorDetails{Type: streaming.CreateGroupCursorDetailsTypeTrimHorizon,
CommitOnGet: common.Bool(true),
GroupName: common.String("Go-groupname-0"),
InstanceName: common.String("Go-groupname-0-instancename-0"),
TimeoutInMs: common.Int(1000),
}}
// Send the request using the service client
grpCursorResp0, err := client.CreateGroupCursor(context.Background(), grpCursorCreateReq0)
helpers.FatalIfError(err)
// Retrieve value from the response.
fmt.Println(grpCursorResp0)
simpleGetMsgLoop(client, streamOcid, *grpCursorResp0.Value)
}
func simpleGetMsgLoop(streamClient streaming.StreamClient, streamOcid string, cursorValue string) {
for i := 0; i < 5; i++ {
getMsgReq := streaming.GetMessagesRequest{Limit: common.Int(3),
StreamId: common.String(streamOcid),
Cursor: common.String(cursorValue)}
// Send the request using the service client
getMsgResp, err := streamClient.GetMessages(context.Background(), getMsgReq)
helpers.FatalIfError(err)
// Retrieve value from the response.
if len(getMsgResp.Items) > 0 {
fmt.Println("Key : " + string(getMsgResp.Items[0].Key) + ", value : " + string(getMsgResp.Items[0].Value) + ", Partition " + *getMsgResp.Items[0].Partition)
}
if len(getMsgResp.Items) > 1 {
fmt.Println("Key : " + string(getMsgResp.Items[1].Key) + ", value : " + string(getMsgResp.Items[1].Value) + ", Partition " + *getMsgResp.Items[1].Partition)
}
cursorValue = *getMsgResp.OpcNextCursor
}
}
Save Consumer.go.
Open the terminal and cd to the wd directory, run
the following commands, in order:
This command creates the go.mod file in the
wd directory:
Copy
go mod init oss_consumer_example/v0
This command installs the OCI SDK for Go
and for Streaming:
Copy
go mod tidy
This command runs the example:
Copy
go run Consumer.go
You should see messages similar to the following:
Go oci oss sdk example for consumer
{ RawResponse={200 OK 200 HTTP/1.1 1 1 map[Access-Control-Allow-Credentials:[true] ... }
Key : , value : Example Test Message 0, Partition 0
Key : , value : Example Test Message 0, Partition 0
Key : , value : Example Test Message 0, Partition 0
Key : , value : Example Test Message 0, Partition 0
Key : , value : Example Test Message 0, Partition 0