This page provides code samples for configuring client libraries to connect to clusters in Memorystore for Redis Cluster.
Client library code sample
This section shows a Lettuce client library code sample for connecting to a cluster. For this sample, the cluster doesn't use Identity and Access Management (IAM) authentication or in-transit encryption.
We recommend using Lettuce, version 6.2.2 or later.
Lettuce
// Create RedisURI from the MRC discovery endpoint RedisURI redisUri = RedisURI.Builder.redis(CLUSTER_DISC_EP_ADDR, CLUSTER_DISC_EP_PORT).build(); // Configure client' resources // Configure reconnectDelay with exponential backoff and full jitter ClientResources resources = DefaultClientResources.builder() .reconnectDelay(Delay.fullJitter( Duration.ofMillis(100), // minimum 100 millisecond delay Duration.ofSeconds(5), // maximum 5 second delay 100, TimeUnit.MILLISECONDS) // 100 millisecond base ).build(); // Create a cluster client with the URI and resources RedisClusterClient clusterClient = RedisClusterClient.create(resources, redisUri); // Configure the topology refreshment options // Enable periodic cluster topology updates so that the client updates the cluster topology in the intervals of // 60 seconds // Enable adaptive topology refresh that uses all triggers: MOVED_REDIRECT, ASK_REDIRECT, // PERSISTENT_RECONNECTS, UNCOVERED_SLOT, UNKNOWN_NODE // Disable dynamicRefreshSources so that only the initial seed nodes (Memorystore for Redis Cluster // discovery endpoint) will be used as the source for topology discovery // Enable closing stale connections when refreshing the cluster topology. This reduces the need to handle // failed connections during command runtime. ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enablePeriodicRefresh(1, TimeUnit.MINUTES) .enableAllAdaptiveRefreshTriggers() .dynamicRefreshSources(false) .closeStaleConnections(true) .build(); // Configure the socket options // Set connectTimeout based on your application requirements and workload // Enable TCP keepAlive to reduce the need to handle failed connections during command runtime SocketOptions socketOptions = SocketOptions.builder() .connectTimeout(CONNECT_TIMEOUT) .keepAlive(true) .build(); // Configure the client options // Enable AutoReconnect when connection is lost // Set nodeFilter to filter out failed nodes from the topology // Disable validateClusterNodeMembership to allow redirecting commands to newly added nodes clusterClient.setOptions(ClusterClientOptions.builder() .topologyRefreshOptions(topologyRefreshOptions) .socketOptions(socketOptions) .autoReconnect(true) .nodeFilter(it -> ! (it.is(RedisClusterNode.NodeFlag.FAIL) || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) || it.is(RedisClusterNode.NodeFlag.NOADDR))) .validateClusterNodeMembership(false) .build()); // Create a connection pool GenericObjectPool<StatefulRedisClusterConnection<String, String> pool = ConnectionPoolSupport.createGenericObjectPool(() -> clusterClient.connect(), new GenericObjectPoolConfig()); pool.setMaxTotal(MAX_CONNECTIONS_IN_CONNECTION_POOL); // Get a connection from the connection pool StatefulRedisClusterConnection<String, String> connection = pool.borrowObject(); // Get a cluster sync command and call 'set' RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync(); syncCommands.set(key, value);
In-transit encryption client library code sample
This section gives an example of client code for authenticating with in-transit encryption for your Memorystore cluster with the go-redis client library.
go-redis
import ( "context" "crypto/tls" "crypto/x509" "io/ioutil" "log" "time" "github.com/go-redis/redis/v9" ) func example() { // Load CA cert caFilePath :=caCert, err := ioutil.ReadFile(caFilePath) if err != nil { log.Fatal(err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) // Setup Redis Connection pool client := redis.NewClusterClient(&redis.ClusterOptions{ Addrs: []string{"CLUSTER_DISC_EP_ADDR:CLUSTER_DISC_EP_PORT"}, // PoolSize applies per cluster node and not for the whole cluster. PoolSize: 10, ConnMaxIdleTime: 60 * time.Second, MinIdleConns: 1, TLSConfig: &tls.Config{ RootCAs: caCertPool, }, }) ctx := context.Background() err = client.Set(ctx, "key", "value", 0).Err() if err != nil { log.Fatal(err) } }
IAM authentication and in-transit encryption code sample
This section gives an example of how to authenticate and connect to a cluster by using both IAM authentication and in-transit encryption with various client libraries:
redis-py
from google.cloud import iam_credentials_v1 from redis.backoff import ConstantBackoff from redis.retry import Retry from redis.exceptions import ( ConnectionError, AuthenticationWrongNumberOfArgsError, AuthenticationError ) from redis.utils import (str_if_bytes) import redis service_account="projects/-/serviceAccounts/<TO-DO-1: your service account that used to authenticate to Redis Cluster>" host=<TO-DO-2: your Redis Cluster discovery endpoint ip> ssl_ca_certs=<TO-DO-3, your trusted server ca file name> def generate_access_token(): # Create a client client = iam_credentials_v1.IAMCredentialsClient() # Initialize request argument(s) request = iam_credentials_v1.GenerateAccessTokenRequest( name=service_account, scope=['https://www.googleapis.com/auth/cloud-platform'], ) # Make the request response = client.generate_access_token(request=request) # Handle the response return str(response.access_token) def iam_connect(self): "Initialize the connection and authenticate" self._parser.on_connect(self) auth_args = (generate_access_token()) self.send_command("AUTH", *auth_args, check_health=False) try: auth_response = self.read_response() except AuthenticationWrongNumberOfArgsError: self.send_command("AUTH", self.password, check_health=False) auth_response = self.read_response() if str_if_bytes(auth_response) != "OK": raise AuthenticationError("Invalid Username or Password") # Connect to Memorystore for Redis Cluster backoff = ConstantBackoff(3) retry = Retry(retries=-1, backoff=backoff, supported_errors=(ConnectionError, ConnectionResetError)) r=redis.cluster.RedisCluster(host=host, port=6379,redis_connect_func=iam_connect, retry=retry, ssl=True, ssl_ca_certs=ssl_ca_certs) print(r.get('key'))
Lettuce
import com.google.cloud.iam.credentials.v1.GenerateAccessTokenResponse; import com.google.cloud.iam.credentials.v1.IamCredentialsClient; import io.lettuce.core.RedisCredentials; import io.lettuce.core.RedisCredentialsProvider; import io.lettuce.core.RedisURI; import io.lettuce.core.SocketOptions; import io.lettuce.core.SslOptions; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; import io.lettuce.core.resource.Delay; import java.io.Closeable; import java.io.File; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import reactor.core.publisher.Mono; public class IAMAuth { /** * This thread-safe implementation (excluding the main app below) is intended for production use. * It provides a background refresh logic that shouldn't overload IAM service in case. the * application has many many connections (connection storms can result in IAM throttles * otherwise).
*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider, Runnable, Closeable { private static final Logger logger = Logger.getLogger(RedisClusterCredentialsProvider.class.getName()); private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); private final IamCredentialsClient iamClient; private final String accountName; private final Duration refreshDuration; private final Duration lifetime; private volatile RedisCredentials credentials; private volatile Instant lastRefreshInstant; private volatile Exception lastException; /** * AccountName: * "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com"; * RefreshDuration: Duration.ofSeconds(300) Lifetime: Duration.ofSeconds(3600); */ public RedisClusterCredentialsProvider( String accountName, Duration refreshDuration, Duration lifetime) throws Exception { this.iamClient = IamCredentialsClient.create(); this.accountName = accountName; this.refreshDuration = refreshDuration; this.lifetime = lifetime; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow(); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS); } @Override public MonoresolveCredentials() { if (hasTokenExpired()) { throw new RuntimeException("Background IAM token refresh failed", lastException); } return Mono.just(this.credentials); } private boolean hasTokenExpired() { if (this.lastRefreshInstant == null || this.lifetime == null) { return true; } return Instant.now().isAfter(this.lastRefreshInstant.plus(this.lifetime)); } // To be invoked by customer app on shutdown @Override public void close() { service.shutdown(); iamClient.close(); } @Override public void run() { try { // fetch token if it is time to refresh if (this.lastRefreshInstant != null && this.refreshDuration != null && Instant.now().isBefore(this.lastRefreshInstant.plus(this.refreshDuration))) { // nothing to do return; } refreshTokenNow(); } catch (Exception e) { // suppress all errors as we cannot allow the task to die // log for visibility logger.log(Level.parse("SEVERE"), "Background IAM token refresh failed", e); } } private void refreshTokenNow() { try { logger.info("Refreshing IAM token"); com.google.protobuf.Duration lifetimeProto = com.google.protobuf.Duration.newBuilder() .setSeconds(lifetime.getSeconds()) .setNanos(lifetime.getNano()) .build(); GenerateAccessTokenResponse response = this.iamClient.generateAccessToken( this.accountName, new ArrayList<>(), Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"), lifetimeProto); // got a successful token refresh this.credentials = new RedisCredentials() { @Override public boolean hasUsername() { return false; } @Override public boolean hasPassword() { return true; } @Override public String getUsername() { return "default"; } @Override public char[] getPassword() { return response.getAccessToken().toCharArray(); } }; this.lastRefreshInstant = Instant.now(); // clear the last saved exception this.lastException = null; logger.info( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this.refreshDuration + "], accountName [" + this.accountName + "] and lifetime [" + this.lifetime + "]"); } catch (Exception e) { // Save last exception for inline feedback this.lastException = e; // Bubble up for direct feedback throw e; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main(String[] args) throws Exception { // These are the parameters the user needs to replace String discoveryEndpointIp = "CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS"; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER; String accountName = "ACCOUNT_NAME"; String caFileName = "CA_FILE_NAME"; int refreshDurationSec = REFRESH_DURATION_SEC; int lifetimeSec = LIFETIME_SEC; RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider( accountName, Duration.ofSeconds(refreshDurationSec), Duration.ofSeconds(lifetimeSec)); RedisURI redisUri = RedisURI.Builder.redis(discoveryEndpointIp, discoveryEndpointPort) .withSsl(true) .withAuthentication(credentialsProvider) .build(); ClientResources resources = DefaultClientResources.builder() .reconnectDelay( Delay.fullJitter( Duration.ofMillis(100), Duration.ofSeconds(5), 100, TimeUnit.MILLISECONDS)) .build(); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enablePeriodicRefresh(1, TimeUnit.MINUTES) .enableAllAdaptiveRefreshTriggers() .dynamicRefreshSources(false) .closeStaleConnections(true) .build(); SslOptions sslOptions = SslOptions.builder().jdkSslProvider().trustManager(new File(caFileName)).build(); SocketOptions socketOptions = SocketOptions.builder().connectTimeout(Duration.ofSeconds(5)).keepAlive(true).build(); // Create Redis Cluster Client RedisClusterClient clusterClient = RedisClusterClient.create(resources, redisUri); clusterClient.setOptions( ClusterClientOptions.builder() .topologyRefreshOptions(topologyRefreshOptions) .socketOptions(socketOptions) .sslOptions(sslOptions) .autoReconnect(true) .nodeFilter( it -> !(it.is(RedisClusterNode.NodeFlag.FAIL) || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) || it.is(RedisClusterNode.NodeFlag.NOADDR))) .validateClusterNodeMembership(false) .build()); // Establish connection to Redis Cluster StatefulRedisClusterConnection<String, String> connection = clusterClient.connect(); // Retrieve synchronous Redis Cluster commands RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync(); // Perform Redis operations syncCommands.set("key1", "value1"); String value = syncCommands.get("key1"); System.out.println("Retrieved value: " + value); int count = 0; for (int i = 0; i < 1000; i++) { String k = "lettucekey" + String.valueOf(i); String v = "lettucevalue" + String.valueOf(i); syncCommands.set(k, v); String got = syncCommands.get(k); if (got.equals(v)) { count++; } else { System.out.println("unexpected value"); } } System.out.println("Successfully got " + String.valueOf(count) + " keys"); // Close the connection and shutdown the client connection.close(); clusterClient.shutdown(); ((Closeable) credentialsProvider).close(); } }
Jedis
import com.google.cloud.iam.credentials.v1.GenerateAccessTokenResponse; import com.google.cloud.iam.credentials.v1.IamCredentialsClient; import java.io.Closeable; import java.io.FileInputStream; import java.io.InputStream; import java.security.KeyStore; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.Connection; import redis.clients.jedis.DefaultJedisClientConfig; import redis.clients.jedis.DefaultRedisCredentials; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.RedisCredentials; import redis.clients.jedis.RedisCredentialsProvider; /** Customers are free to update/replace code as they see fit. */ public class IAMAuth { /** * This thread-safe implementation (excluding the main app below) is intended for production use. * It provides a background refresh logic that shouldn't overload IAM service in case. the * application has many many connections (connection storms can result in IAM throttles * otherwise).
*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider, Runnable, Closeable { private static final Logger logger = Logger.getLogger(RedisClusterCredentialsProvider.class.getName()); private final IamCredentialsClient iamClient; private final ScheduledExecutorService service; private final String accountName; private final Duration refreshDuration; private final Duration lifetime; private volatile RedisCredentials credentials; private volatile Instant lastRefreshInstant; private volatile Exception lastException; // AccountName: // "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com"; // RefreshDuration: Duration.ofSeconds(300); // Lifetime: Duration.ofSeconds(3600); public RedisClusterCredentialsProvider( String accountName, Duration refreshDuration, Duration lifetime) throws Exception { this.iamClient = IamCredentialsClient.create(); this.service = Executors.newSingleThreadScheduledExecutor(); this.accountName = accountName; this.refreshDuration = refreshDuration; this.lifetime = lifetime; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow(); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS); } public RedisCredentials get() { if (hasTokenExpired()) { throw new RuntimeException("Background IAM token refresh failed", lastException); } return this.credentials; } private boolean hasTokenExpired() { if (this.lastRefreshInstant == null || this.lifetime == null) { return true; } return Instant.now().isAfter(this.lastRefreshInstant.plus(this.lifetime)); } // To be invoked by customer app on shutdown @Override public void close() { service.shutdown(); iamClient.close(); } @Override public void run() { try { // fetch token if it is time to refresh if (this.lastRefreshInstant != null && this.refreshDuration != null && Instant.now().isBefore(this.lastRefreshInstant.plus(this.refreshDuration))) { // nothing to do return; } refreshTokenNow(); } catch (Exception e) { // suppress all errors as we cannot allow the task to die // log for visibility logger.log(Level.parse("SEVERE"), "Background IAM token refresh failed", e); } } private void refreshTokenNow() { try { logger.info("Refreshing IAM token"); Listdelegates = new ArrayList<>(); com.google.protobuf.Duration lifetimeProto = com.google.protobuf.Duration.newBuilder() .setSeconds(lifetime.getSeconds()) .setNanos(lifetime.getNano()) .build(); GenerateAccessTokenResponse response = iamClient.generateAccessToken( this.accountName, delegates, Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"), lifetimeProto); // got a successful token refresh this.credentials = new DefaultRedisCredentials("default", response.getAccessToken()); this.lastRefreshInstant = Instant.now(); // clear the last saved exception this.lastException = null; logger.info( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this.refreshDuration + "], accountName [" + this.accountName + "] and lifetime [" + this.lifetime + "]"); } catch (Exception e) { // Save last exception for inline feedback this.lastException = e; // Bubble up for direct feedback throw e; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main(String[] args) throws Exception { String discoveryEndpointIp = "CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS"; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER; GenericObjectPoolConfig config = new GenericObjectPoolConfig (); config.setTestWhileIdle(true); int timeout = 5000; int maxAttempts = 5; HostAndPort discovery = new HostAndPort(discoveryEndpointIp, discoveryEndpointPort); RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider( "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com", Duration.ofSeconds(300), Duration.ofSeconds(3600)); // Create JedisCluster cluster InputStream is = new FileInputStream("server-ca.pem"); // You could get a resource as a stream instead. CertificateFactory cf = CertificateFactory.getInstance("X.509"); X509Certificate caCert = (X509Certificate) cf.generateCertificate(is); TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null); // You don't need the KeyStore cluster to come from a file. ks.setCertificateEntry("caCert", caCert); tmf.init(ks); SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, tmf.getTrustManagers(), null); JedisCluster jedisCluster = new JedisCluster( discovery, DefaultJedisClientConfig.builder() .connectionTimeoutMillis(timeout) .socketTimeoutMillis(timeout) .credentialsProvider(credentialsProvider) .ssl(true) .sslSocketFactory(sslContext.getSocketFactory()) .build(), maxAttempts, config); // Perform operations on the cluster jedisCluster.set("myKey", "Hello, Redis Cluster!"); String value = jedisCluster.get("myKey"); System.out.println("Value for myKey: " + value); int count = 0; for (int i = 0; i < 1000; i++) { String k = "jediskey" + String.valueOf(i); String v = "jedisvalue" + String.valueOf(i); jedisCluster.set(k, v); String got = jedisCluster.get(k); if (got.equals(v)) { count++; } else { System.out.println("unexpected value"); } } System.out.println("Successfully got " + String.valueOf(count) + " keys"); // Disconnect from the cluster jedisCluster.close(); // Cleanup the resources used by the provider ((Closeable) credentialsProvider).close(); } }
Go
package main import ( "context" "crypto/tls" "crypto/x509" "flag" "fmt" "io/ioutil" "log" "sync" "time" credentials "google.golang.org/genproto/googleapis/iam/credentials/v1" "github.com/golang/protobuf/ptypes" "github.com/redis/go-redis/v9" "google.golang.org/api/option" gtransport "google.golang.org/api/transport/grpc" ) var ( svcAccount = flag.String("a", "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com", "service account email") lifetime = flag.Duration("d", time.Hour, "lifetime of token") refreshDuration = flag.Duration("r", 5*time.Minute, "token refresh duration") checkTokenExpiryInterval = flag.Duration("e", 10*time.Second, "check token expiry interval") lastRefreshInstant = time.Time{} errLastSeen = error(nil) token = "" mu = sync.RWMutex{} ) func retrieveToken() (string, error) { ctx := context.Background() conn, err := gtransport.Dial(ctx, option.WithEndpoint("iamcredentials.googleapis.com:443"), option.WithScopes("https://www.googleapis.com/auth/cloud-platform")) if err != nil { log.Printf("Failed to dial API, error: %v", err) return token, err } client := credentials.NewIAMCredentialsClient(conn) req := credentials.GenerateAccessTokenRequest{ Name: *svcAccount, Scope: []string{"https://www.googleapis.com/auth/cloud-platform"}, Lifetime: ptypes.DurationProto(*lifetime), } rsp, err := client.GenerateAccessToken(ctx, &req) if err != nil { log.Printf"Failed to call GenerateAccessToken with request: %v, error: %v", req, err) return token, err } return rsp.AccessToken, nil } func refreshTokenLoop() { if *refreshDuration > *lifetime { log.Fatal("Refresh should not happen after token is already expired.") } for { mu.RLock() lastRefreshTime := lastRefreshInstant mu.RUnlock() if time.Now().After(lastRefreshTime.Add(*refreshDuration)) { var err error retrievedToken, err := retrieveToken() mu.Lock() token = retrievedToken if err != nil { errLastSeen = err } else { lastRefreshInstant = time.Now() } mu.Unlock() } time.Sleep(*checkTokenExpiryInterval) } } func retrieveTokenFunc() (string, string) { mu.RLock() defer mu.RUnlock() if time.Now().After(lastRefreshInstant.Add(*refreshDuration)) { log.Printf("Token is expired. last refresh instant: %v, refresh duration: %v, error that was last seen: %v", lastRefreshInstant, *refreshDuration, errLastSeen) return "", "" } username := "default" password := token return username, password } func main() { // Load CA cert caFilePath := CA_FILE_PATH clusterDicEpAddr := CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS_AND_PORT caCert, err := ioutil.ReadFile(caFilePath) if err != nil { log.Fatal(err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) token, err = retrieveToken() if err != nil { log.Fatal("Cannot retrieve IAM token to authenticate to the cluster, error: %v", err) } lastRefreshInstant = time.Now() go refreshTokenLoop() // Setup Redis Connection pool client := redis.NewClusterClient(&redis.ClusterOptions{ Addrs: []string{clusterDicEpAddr}, // PoolSize applies per cluster node and not for the whole cluster. PoolSize: 10, ConnMaxIdleTime: 60 * time.Second, MinIdleConns: 1, CredentialsProvider: retrieveTokenFunc, TLSConfig: &tls.Config{ RootCAs: caCertPool, }, }) ctx := context.Background() err = client.Set(ctx, "key", "value", 0).Err() if err != nil { log.Fatal(err) } val, err := client.Get(ctx, "key").Result() if err != nil { log.Fatal(err) } fmt.Printf("Got the value for key: key, which is %s \n", val) }