Halaman ini menyediakan contoh kode untuk mengonfigurasi library klien agar terhubung ke cluster di Memorystore for Redis Cluster.
Contoh kode library klien
Bagian ini menampilkan contoh kode library klien Lettuce untuk terhubung ke cluster. Untuk contoh ini, cluster tidak menggunakan autentikasi Identity and Access Management (IAM) atau enkripsi saat transit.
Selada
Sebaiknya gunakan Lettuce, versi 6.2.4 dan yang lebih baru.
// Create RedisURI from the MRC discovery endpoint RedisURI redisUri = RedisURI.Builder.redis(CLUSTER_DISC_EP_ADDR, CLUSTER_DISC_EP_PORT).build(); // Configure client' resources // Configure reconnectDelay with exponential backoff and full jitter ClientResources resources = DefaultClientResources.builder() .reconnectDelay(Delay.fullJitter( Duration.ofMillis(100), // minimum 100 millisecond delay Duration.ofSeconds(5), // maximum 5 second delay 100, TimeUnit.MILLISECONDS) // 100 millisecond base ).build(); // Create a cluster client with the URI and resources RedisClusterClient clusterClient = RedisClusterClient.create(resources, redisUri); // Configure the topology refreshment options // Enable periodic cluster topology updates so that the client updates the cluster topology in the intervals of // 60 seconds // Enable adaptive topology refresh that uses all triggers: MOVED_REDIRECT, ASK_REDIRECT, // PERSISTENT_RECONNECTS, UNCOVERED_SLOT, UNKNOWN_NODE // Disable dynamicRefreshSources so that only the initial seed nodes (Memorystore for Redis Cluster // discovery endpoint) will be used as the source for topology discovery // Enable closing stale connections when refreshing the cluster topology. This reduces the need to handle // failed connections during command runtime. ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enablePeriodicRefresh(1, TimeUnit.MINUTES) .enableAllAdaptiveRefreshTriggers() .dynamicRefreshSources(false) .closeStaleConnections(true) .build(); // Configure the socket options // Set connectTimeout based on your application requirements and workload // Enable TCP keepAlive to reduce the need to handle failed connections during command runtime SocketOptions socketOptions = SocketOptions.builder() .connectTimeout(CONNECT_TIMEOUT) .keepAlive(true) .build(); // Configure the client options // Enable AutoReconnect when connection is lost // Set nodeFilter to filter out failed nodes from the topology // Disable validateClusterNodeMembership to allow redirecting commands to newly added nodes clusterClient.setOptions(ClusterClientOptions.builder() .topologyRefreshOptions(topologyRefreshOptions) .socketOptions(socketOptions) .autoReconnect(true) .nodeFilter(it -> ! (it.is(RedisClusterNode.NodeFlag.FAIL) || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) || it.is(RedisClusterNode.NodeFlag.NOADDR))) .validateClusterNodeMembership(false) .build()); // Create a connection pool GenericObjectPool<StatefulRedisClusterConnection<String, String> pool = ConnectionPoolSupport.createGenericObjectPool(() -> clusterClient.connect(), new GenericObjectPoolConfig()); pool.setMaxTotal(MAX_CONNECTIONS_IN_CONNECTION_POOL); // Get a connection from the connection pool StatefulRedisClusterConnection<String, String> connection = pool.borrowObject(); // Get a cluster sync command and call 'set' RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync(); syncCommands.set(key, value);
Contoh kode library klien enkripsi dalam transit
Bagian ini memberikan contoh kode klien untuk mengautentikasi dengan enkripsi dalam transit untuk cluster Memorystore Anda dengan library klien go-redis.
go-redis
Sebaiknya gunakan go-redis, versi 9.11.0 dan yang lebih baru.
import ( "context" "crypto/tls" "crypto/x509" "io/ioutil" "log" "time" "github.com/go-redis/redis/v9" ) func example() { // Load CA cert caFilePath :=caCert, err := ioutil.ReadFile(caFilePath) if err != nil { log.Fatal(err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) // Setup Redis Connection pool client := redis.NewClusterClient(&redis.ClusterOptions{ Addrs: []string{"CLUSTER_DISC_EP_ADDR:CLUSTER_DISC_EP_PORT"}, // PoolSize applies per cluster node and not for the whole cluster. PoolSize: 10, ConnMaxIdleTime: 60 * time.Second, MinIdleConns: 1, TLSConfig: &tls.Config{ RootCAs: caCertPool, }, }) ctx := context.Background() err = client.Set(ctx, "key", "value", 0).Err() if err != nil { log.Fatal(err) } }
Contoh kode enkripsi saat transit dan autentikasi IAM
Bagian ini memberikan contoh cara mengautentikasi dan menghubungkan ke cluster dengan menggunakan autentikasi IAM dan enkripsi dalam transit dengan berbagai library klien:
redis-py
Sebaiknya gunakan redis-py, versi 5.1 dan yang lebih baru.
from google.cloud import iam_credentials_v1 from redis.backoff import ConstantBackoff from redis.retry import Retry from redis.exceptions import ( ConnectionError, AuthenticationWrongNumberOfArgsError, AuthenticationError ) from redis.utils import (str_if_bytes) import redis service_account="projects/-/serviceAccounts/<TO-DO-1: email of service account used to authenticate to Redis Cluster>" host=<TO-DO-2: your Redis Cluster discovery endpoint ip> ssl_ca_certs=<TO-DO-3, your trusted server ca file name> def generate_access_token(): # Create a client client = iam_credentials_v1.IAMCredentialsClient() # Initialize request argument(s) request = iam_credentials_v1.GenerateAccessTokenRequest( name=service_account, scope=['https://www.googleapis.com/auth/cloud-platform'], ) # Make the request response = client.generate_access_token(request=request) # Handle the response return str(response.access_token) def iam_connect(self): "Initialize the connection and authenticate" self._parser.on_connect(self) auth_args = (generate_access_token(),) self.send_command("AUTH", *auth_args, check_health=False) try: auth_response = self.read_response() except AuthenticationWrongNumberOfArgsError: self.send_command("AUTH", self.password, check_health=False) auth_response = self.read_response() if str_if_bytes(auth_response) != "OK": raise AuthenticationError("Invalid Username or Password") # Connect to Memorystore for Redis Cluster backoff = ConstantBackoff(3) retry = Retry(retries=-1, backoff=backoff, supported_errors=(ConnectionError, ConnectionResetError)) r=redis.cluster.RedisCluster(host=host, port=6379,redis_connect_func=iam_connect, retry=retry, ssl=True, ssl_ca_certs=ssl_ca_certs) print(r.get('key'))
Selada
Sebaiknya gunakan Lettuce, versi 6.2.4 dan yang lebih baru.
import com.google.cloud.iam.credentials.v1.GenerateAccessTokenResponse; import com.google.cloud.iam.credentials.v1.IamCredentialsClient; import io.lettuce.core.RedisCredentials; import io.lettuce.core.RedisCredentialsProvider; import io.lettuce.core.RedisURI; import io.lettuce.core.SocketOptions; import io.lettuce.core.SslOptions; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; import io.lettuce.core.resource.Delay; import java.io.Closeable; import java.io.File; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import reactor.core.publisher.Mono; public class IAMAuth { /** * This thread-safe implementation (excluding the main app below) is intended for production use. * It provides a background refresh logic that shouldn't overload IAM service in case. the * application has many many connections (connection storms can result in IAM throttles * otherwise).
*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider, Runnable, Closeable { private static final Logger logger = Logger.getLogger(RedisClusterCredentialsProvider.class.getName()); private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); private final IamCredentialsClient iamClient; private final String accountName; private final Duration refreshDuration; private final Duration lifetime; private volatile RedisCredentials credentials; private volatile Instant lastRefreshInstant; private volatile Exception lastException; /** * AccountName: * "projects/-/serviceAccounts/example-service-account@example-project."; * RefreshDuration: Duration.ofSeconds(300) Lifetime: Duration.ofSeconds(3600); */ public RedisClusterCredentialsProvider( String accountName, Duration refreshDuration, Duration lifetime) throws Exception { this.iamClient = IamCredentialsClient.create(); this.accountName = accountName; this.refreshDuration = refreshDuration; this.lifetime = lifetime; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow(); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS); } @Override public MonoresolveCredentials() { if (hasTokenExpired()) { throw new RuntimeException("Background IAM token refresh failed", lastException); } return Mono.just(this.credentials); } private boolean hasTokenExpired() { if (this.lastRefreshInstant == null || this.lifetime == null) { return true; } return Instant.now().isAfter(this.lastRefreshInstant.plus(this.lifetime)); } // To be invoked by customer app on shutdown @Override public void close() { service.shutdown(); iamClient.close(); } @Override public void run() { try { // fetch token if it is time to refresh if (this.lastRefreshInstant != null && this.refreshDuration != null && Instant.now().isBefore(this.lastRefreshInstant.plus(this.refreshDuration))) { // nothing to do return; } refreshTokenNow(); } catch (Exception e) { // suppress all errors as we cannot allow the task to die // log for visibility logger.log(Level.parse("SEVERE"), "Background IAM token refresh failed", e); } } private void refreshTokenNow() { try { logger.info("Refreshing IAM token"); com.google.protobuf.Duration lifetimeProto = com.google.protobuf.Duration.newBuilder() .setSeconds(lifetime.getSeconds()) .setNanos(lifetime.getNano()) .build(); GenerateAccessTokenResponse response = this.iamClient.generateAccessToken( this.accountName, new ArrayList<>(), Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"), lifetimeProto); // got a successful token refresh this.credentials = new RedisCredentials() { @Override public boolean hasUsername() { return false; } @Override public boolean hasPassword() { return true; } @Override public String getUsername() { return "default"; } @Override public char[] getPassword() { return response.getAccessToken().toCharArray(); } }; this.lastRefreshInstant = Instant.now(); // clear the last saved exception this.lastException = null; logger.info( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this.refreshDuration + "], accountName [" + this.accountName + "] and lifetime [" + this.lifetime + "]"); } catch (Exception e) { // Save last exception for inline feedback this.lastException = e; // Bubble up for direct feedback throw e; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main(String[] args) throws Exception { // These are the parameters the user needs to replace String discoveryEndpointIp = "CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS"; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER; String accountName = "ACCOUNT_NAME"; String caFileName = "CA_FILE_NAME"; int refreshDurationSec = REFRESH_DURATION_SEC; int lifetimeSec = LIFETIME_SEC; RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider( accountName, Duration.ofSeconds(refreshDurationSec), Duration.ofSeconds(lifetimeSec)); RedisURI redisUri = RedisURI.Builder.redis(discoveryEndpointIp, discoveryEndpointPort) .withSsl(true) .withAuthentication(credentialsProvider) .build(); ClientResources resources = DefaultClientResources.builder() .reconnectDelay( Delay.fullJitter( Duration.ofMillis(100), Duration.ofSeconds(5), 100, TimeUnit.MILLISECONDS)) .build(); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enablePeriodicRefresh(1, TimeUnit.MINUTES) .enableAllAdaptiveRefreshTriggers() .dynamicRefreshSources(false) .closeStaleConnections(true) .build(); SslOptions sslOptions = SslOptions.builder().jdkSslProvider().trustManager(new File(caFileName)).build(); SocketOptions socketOptions = SocketOptions.builder().connectTimeout(Duration.ofSeconds(5)).keepAlive(true).build(); // Create Redis Cluster Client RedisClusterClient clusterClient = RedisClusterClient.create(resources, redisUri); clusterClient.setOptions( ClusterClientOptions.builder() .topologyRefreshOptions(topologyRefreshOptions) .socketOptions(socketOptions) .sslOptions(sslOptions) .autoReconnect(true) .nodeFilter( it -> !(it.is(RedisClusterNode.NodeFlag.FAIL) || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) || it.is(RedisClusterNode.NodeFlag.NOADDR))) .validateClusterNodeMembership(false) .build()); // Establish connection to Redis Cluster StatefulRedisClusterConnection<String, String> connection = clusterClient.connect(); // Retrieve synchronous Redis Cluster commands RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync(); // Perform Redis operations syncCommands.set("key1", "value1"); String value = syncCommands.get("key1"); System.out.println("Retrieved value: " + value); int count = 0; for (int i = 0; i < 1000; i++) { String k = "lettucekey" + String.valueOf(i); String v = "lettucevalue" + String.valueOf(i); syncCommands.set(k, v); String got = syncCommands.get(k); if (got.equals(v)) { count++; } else { System.out.println("unexpected value"); } } System.out.println("Successfully got " + String.valueOf(count) + " keys"); // Close the connection and shutdown the client connection.close(); clusterClient.shutdown(); ((Closeable) credentialsProvider).close(); } }
Jedi
Sebaiknya gunakan Jedis, versi 4.4.0 dan yang lebih baru.
import com.google.cloud.iam.credentials.v1.GenerateAccessTokenResponse; import com.google.cloud.iam.credentials.v1.IamCredentialsClient; import java.io.Closeable; import java.io.FileInputStream; import java.io.InputStream; import java.security.KeyStore; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.Connection; import redis.clients.jedis.DefaultJedisClientConfig; import redis.clients.jedis.DefaultRedisCredentials; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.RedisCredentials; import redis.clients.jedis.RedisCredentialsProvider; /** Customers are free to update/replace code as they see fit. */ public class IAMAuth { /** * This thread-safe implementation (excluding the main app below) is intended for production use. * It provides a background refresh logic that shouldn't overload IAM service in case. the * application has many many connections (connection storms can result in IAM throttles * otherwise).
*
* Guidelines for implementing similar logic for other clients:
* 1. Refresh IAM tokens in the background using a single thread/routine per client process
* 2. Provide last error feedback inline for token retrieval to aid debugging
* 3. Provide initial setup validation by fast-failing if the token couldn't be retrieved
* 4. Inline getToken shouldn't execute direct IAM calls as it can overload the token retrieval * resulting in throttles
* 5. Typical scale is tens of thousands of Redis connections and the IAM token is required for * every connection being established.
*/ private static final class RedisClusterCredentialsProvider implements RedisCredentialsProvider, Runnable, Closeable { private static final Logger logger = Logger.getLogger(RedisClusterCredentialsProvider.class.getName()); private final IamCredentialsClient iamClient; private final ScheduledExecutorService service; private final String accountName; private final Duration refreshDuration; private final Duration lifetime; private volatile RedisCredentials credentials; private volatile Instant lastRefreshInstant; private volatile Exception lastException; // AccountName: // "projects/-/serviceAccounts/example-service-account@example-project."; // RefreshDuration: Duration.ofSeconds(300); // Lifetime: Duration.ofSeconds(3600); public RedisClusterCredentialsProvider( String accountName, Duration refreshDuration, Duration lifetime) throws Exception { this.iamClient = IamCredentialsClient.create(); this.service = Executors.newSingleThreadScheduledExecutor(); this.accountName = accountName; this.refreshDuration = refreshDuration; this.lifetime = lifetime; // execute on initialization to fast-fail if there are any setup issues refreshTokenNow(); // refresh much more frequently than the expiry time to allow for multiple retries in case of // failures service.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS); } public RedisCredentials get() { if (hasTokenExpired()) { throw new RuntimeException("Background IAM token refresh failed", lastException); } return this.credentials; } private boolean hasTokenExpired() { if (this.lastRefreshInstant == null || this.lifetime == null) { return true; } return Instant.now().isAfter(this.lastRefreshInstant.plus(this.lifetime)); } // To be invoked by customer app on shutdown @Override public void close() { service.shutdown(); iamClient.close(); } @Override public void run() { try { // fetch token if it is time to refresh if (this.lastRefreshInstant != null && this.refreshDuration != null && Instant.now().isBefore(this.lastRefreshInstant.plus(this.refreshDuration))) { // nothing to do return; } refreshTokenNow(); } catch (Exception e) { // suppress all errors as we cannot allow the task to die // log for visibility logger.log(Level.parse("SEVERE"), "Background IAM token refresh failed", e); } } private void refreshTokenNow() { try { logger.info("Refreshing IAM token"); Listdelegates = new ArrayList<>(); com.google.protobuf.Duration lifetimeProto = com.google.protobuf.Duration.newBuilder() .setSeconds(lifetime.getSeconds()) .setNanos(lifetime.getNano()) .build(); GenerateAccessTokenResponse response = iamClient.generateAccessToken( this.accountName, delegates, Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"), lifetimeProto); // got a successful token refresh this.credentials = new DefaultRedisCredentials("default", response.getAccessToken()); this.lastRefreshInstant = Instant.now(); // clear the last saved exception this.lastException = null; logger.info( "IAM token refreshed with lastRefreshInstant [" + lastRefreshInstant + "], refreshDuration [" + this.refreshDuration + "], accountName [" + this.accountName + "] and lifetime [" + this.lifetime + "]"); } catch (Exception e) { // Save last exception for inline feedback this.lastException = e; // Bubble up for direct feedback throw e; } } } /** Sample code to demonstrate how to use IAMAuth; not intended for production use */ public static void main(String[] args) throws Exception { String discoveryEndpointIp = "CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS"; int discoveryEndpointPort = CLUSTER_DISCOVERY_ENDPOINT_PORT_NUMBER; GenericObjectPoolConfig config = new GenericObjectPoolConfig (); config.setTestWhileIdle(true); int timeout = 5000; int maxAttempts = 5; HostAndPort discovery = new HostAndPort(discoveryEndpointIp, discoveryEndpointPort); RedisCredentialsProvider credentialsProvider = new RedisClusterCredentialsProvider( "projects/-/serviceAccounts/example-service-account@example-project.", Duration.ofSeconds(300), Duration.ofSeconds(3600)); // Create JedisCluster cluster InputStream is = new FileInputStream("server-ca.pem"); // You could get a resource as a stream instead. CertificateFactory cf = CertificateFactory.getInstance("X.509"); X509Certificate caCert = (X509Certificate) cf.generateCertificate(is); TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null); // You don't need the KeyStore cluster to come from a file. ks.setCertificateEntry("caCert", caCert); tmf.init(ks); SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, tmf.getTrustManagers(), null); JedisCluster jedisCluster = new JedisCluster( discovery, DefaultJedisClientConfig.builder() .connectionTimeoutMillis(timeout) .socketTimeoutMillis(timeout) .credentialsProvider(credentialsProvider) .ssl(true) .sslSocketFactory(sslContext.getSocketFactory()) .build(), maxAttempts, config); // Perform operations on the cluster jedisCluster.set("myKey", "Hello, Redis Cluster!"); String value = jedisCluster.get("myKey"); System.out.println("Value for myKey: " + value); int count = 0; for (int i = 0; i < 1000; i++) { String k = "jediskey" + String.valueOf(i); String v = "jedisvalue" + String.valueOf(i); jedisCluster.set(k, v); String got = jedisCluster.get(k); if (got.equals(v)) { count++; } else { System.out.println("unexpected value"); } } System.out.println("Successfully got " + String.valueOf(count) + " keys"); // Disconnect from the cluster jedisCluster.close(); // Cleanup the resources used by the provider ((Closeable) credentialsProvider).close(); } }
Go
Sebaiknya gunakan Go, versi 1.24.5 dan yang lebih baru.
package main import ( "context" "crypto/tls" "crypto/x509" "flag" "fmt" "io/ioutil" "log" "sync" "time" credentials "google.golang.org/genproto/googleapis/iam/credentials/v1" "github.com/golang/protobuf/ptypes" "github.com/redis/go-redis/v9" "google.golang.org/api/option" gtransport "google.golang.org/api/transport/grpc" ) var ( svcAccount = flag.String("a", "projects/-/serviceAccounts/example-service-account@example-project.", "service account email") lifetime = flag.Duration("d", time.Hour, "lifetime of token") refreshDuration = flag.Duration("r", 5*time.Minute, "token refresh duration") checkTokenExpiryInterval = flag.Duration("e", 10*time.Second, "check token expiry interval") lastRefreshInstant = time.Time{} errLastSeen = error(nil) token = "" mu = sync.RWMutex{} ) func retrieveToken() (string, error) { ctx := context.Background() conn, err := gtransport.Dial(ctx, option.WithEndpoint("iamcredentials.googleapis.com:443"), option.WithScopes("https://www.googleapis.com/auth/cloud-platform")) if err != nil { log.Printf("Failed to dial API, error: %v", err) return token, err } client := credentials.NewIAMCredentialsClient(conn) req := credentials.GenerateAccessTokenRequest{ Name: *svcAccount, Scope: []string{"https://www.googleapis.com/auth/cloud-platform"}, Lifetime: ptypes.DurationProto(*lifetime), } rsp, err := client.GenerateAccessToken(ctx, &req) if err != nil { log.Printf"Failed to call GenerateAccessToken with request: %v, error: %v", req, err) return token, err } return rsp.AccessToken, nil } func refreshTokenLoop() { if *refreshDuration > *lifetime { log.Fatal("Refresh should not happen after token is already expired.") } for { mu.RLock() lastRefreshTime := lastRefreshInstant mu.RUnlock() if time.Now().After(lastRefreshTime.Add(*refreshDuration)) { var err error retrievedToken, err := retrieveToken() mu.Lock() token = retrievedToken if err != nil { errLastSeen = err } else { lastRefreshInstant = time.Now() } mu.Unlock() } time.Sleep(*checkTokenExpiryInterval) } } func retrieveTokenFunc() (string, string) { mu.RLock() defer mu.RUnlock() if time.Now().After(lastRefreshInstant.Add(*refreshDuration)) { log.Printf("Token is expired. last refresh instant: %v, refresh duration: %v, error that was last seen: %v", lastRefreshInstant, *refreshDuration, errLastSeen) return "", "" } username := "default" password := token return username, password } func main() { // Load CA cert caFilePath := CA_FILE_PATH clusterDicEpAddr := CLUSTER_DISCOVERY_ENDPOINT_IP_ADDRESS_AND_PORT caCert, err := ioutil.ReadFile(caFilePath) if err != nil { log.Fatal(err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) token, err = retrieveToken() if err != nil { log.Fatal("Cannot retrieve IAM token to authenticate to the cluster, error: %v", err) } lastRefreshInstant = time.Now() go refreshTokenLoop() // Setup Redis Connection pool client := redis.NewClusterClient(&redis.ClusterOptions{ Addrs: []string{clusterDicEpAddr}, // PoolSize applies per cluster node and not for the whole cluster. PoolSize: 10, ConnMaxIdleTime: 60 * time.Second, MinIdleConns: 1, CredentialsProvider: retrieveTokenFunc, TLSConfig: &tls.Config{ RootCAs: caCertPool, }, }) ctx := context.Background() err = client.Set(ctx, "key", "value", 0).Err() if err != nil { log.Fatal(err) } val, err := client.Get(ctx, "key").Result() if err != nil { log.Fatal(err) } fmt.Printf("Got the value for key: key, which is %s \n", val) }