[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2025-08-28 UTC."],[],[],null,["# Class CloudPubSubSourceConnector (1.2.0)\n\nVersion latestkeyboard_arrow_down\n\n- [1.2.0 (latest)](/java/docs/reference/pubsub-group-kafka-connector/latest/com.google.pubsub.kafka.source.CloudPubSubSourceConnector)\n- [1.1.0](/java/docs/reference/pubsub-group-kafka-connector/1.1.0/com.google.pubsub.kafka.source.CloudPubSubSourceConnector)\n- [1.0.0](/java/docs/reference/pubsub-group-kafka-connector/1.0.0/com.google.pubsub.kafka.source.CloudPubSubSourceConnector)\n- [0.1.5](/java/docs/reference/pubsub-group-kafka-connector/0.1.5/com.google.pubsub.kafka.source.CloudPubSubSourceConnector) \n\n public class CloudPubSubSourceConnector extends SourceConnector\n\nA SourceConnector that writes messages to a specific topic in [Apache Kafka](http://kafka.apache.org/). \n\nInheritance\n-----------\n\n[java.lang.Object](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html) \\\u003e org.apache.kafka.connect.connector.Connector \\\u003e org.apache.kafka.connect.source.SourceConnector \\\u003e CloudPubSubSourceConnector \n\nInherited Members\n-----------------\n\n[Object.clone()](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#clone--) \n[Object.equals(Object)](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#equals-java.lang.Object-) \n[Object.finalize()](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#finalize--) \n[Object.getClass()](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#getClass--) \n[Object.hashCode()](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#hashCode--) \n[Object.notify()](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#notify--) \n[Object.notifyAll()](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#notifyAll--) \n[Object.toString()](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#toString--) \n[Object.wait()](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait--) \n[Object.wait(long)](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait-long-) \n[Object.wait(long,int)](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait-long-int-) \norg.apache.kafka.connect.connector.Connector.config() \norg.apache.kafka.connect.connector.Connector.initialize(org.apache.kafka.connect.connector.ConnectorContext) \norg.apache.kafka.connect.connector.Connector.initialize(org.apache.kafka.connect.connector.ConnectorContext,java.util.List\\\u003cjava.util.Map\\\u003cjava.lang.String,java.lang.String\\\u003e\\\u003e) \norg.apache.kafka.connect.connector.Connector.reconfigure(java.util.Map\\\u003cjava.lang.String,java.lang.String\\\u003e) \norg.apache.kafka.connect.connector.Connector.start(java.util.Map\\\u003cjava.lang.String,java.lang.String\\\u003e) \norg.apache.kafka.connect.connector.Connector.stop() \norg.apache.kafka.connect.connector.Connector.taskClass() \norg.apache.kafka.connect.connector.Connector.taskConfigs(int) \norg.apache.kafka.connect.connector.Connector.validate(java.util.Map\\\u003cjava.lang.String,java.lang.String\\\u003e) \norg.apache.kafka.connect.source.SourceConnector.canDefineTransactionBoundaries(java.util.Map\\\u003cjava.lang.String,java.lang.String\\\u003e) \norg.apache.kafka.connect.source.SourceConnector.context() \norg.apache.kafka.connect.source.SourceConnector.exactlyOnceSupport(java.util.Map\\\u003cjava.lang.String,java.lang.String\\\u003e)\n\nStatic Fields\n-------------\n\n### CPS_MAKE_ORDERING_KEY_ATTRIBUTE\n\n public static final String CPS_MAKE_ORDERING_KEY_ATTRIBUTE\n\n### CPS_MAX_BATCH_SIZE_CONFIG\n\n public static final String CPS_MAX_BATCH_SIZE_CONFIG\n\n### CPS_STREAMING_PULL_ENABLED\n\n public static final String CPS_STREAMING_PULL_ENABLED\n\n### CPS_STREAMING_PULL_FLOW_CONTROL_BYTES\n\n public static final String CPS_STREAMING_PULL_FLOW_CONTROL_BYTES\n\n### CPS_STREAMING_PULL_FLOW_CONTROL_MESSAGES\n\n public static final String CPS_STREAMING_PULL_FLOW_CONTROL_MESSAGES\n\n### CPS_STREAMING_PULL_MAX_ACK_EXTENSION_MS\n\n public static final String CPS_STREAMING_PULL_MAX_ACK_EXTENSION_MS\n\n### CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION\n\n public static final String CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION\n\n### CPS_STREAMING_PULL_PARALLEL_STREAMS\n\n public static final String CPS_STREAMING_PULL_PARALLEL_STREAMS\n\n### CPS_SUBSCRIPTION_CONFIG\n\n public static final String CPS_SUBSCRIPTION_CONFIG\n\n### DEFAULT_CPS_MAX_BATCH_SIZE\n\n public static final int DEFAULT_CPS_MAX_BATCH_SIZE\n\n### DEFAULT_KAFKA_PARTITIONS\n\n public static final int DEFAULT_KAFKA_PARTITIONS\n\n### DEFAULT_KAFKA_PARTITION_SCHEME\n\n public static final String DEFAULT_KAFKA_PARTITION_SCHEME\n\n### KAFKA_MESSAGE_KEY_CONFIG\n\n public static final String KAFKA_MESSAGE_KEY_CONFIG\n\n### KAFKA_MESSAGE_TIMESTAMP_CONFIG\n\n public static final String KAFKA_MESSAGE_TIMESTAMP_CONFIG\n\n### KAFKA_PARTITIONS_CONFIG\n\n public static final String KAFKA_PARTITIONS_CONFIG\n\n### KAFKA_PARTITION_SCHEME_CONFIG\n\n public static final String KAFKA_PARTITION_SCHEME_CONFIG\n\n### KAFKA_TOPIC_CONFIG\n\n public static final String KAFKA_TOPIC_CONFIG\n\n### USE_KAFKA_HEADERS\n\n public static final String USE_KAFKA_HEADERS\n\nConstructors\n------------\n\n### CloudPubSubSourceConnector()\n\n public CloudPubSubSourceConnector()\n\nMethods\n-------\n\n### config()\n\n public ConfigDef config()\n\n**Overrides** \norg.apache.kafka.connect.connector.Connector.config()\n\n### start(Map\\\u003cString,String\\\u003e props)\n\n public void start(Map\u003cString,String\u003e props)\n\n**Overrides** \norg.apache.kafka.connect.connector.Connector.start(java.util.Map\\\u003cjava.lang.String,java.lang.String\\\u003e)\n\n### stop()\n\n public void stop()\n\n**Overrides** \norg.apache.kafka.connect.connector.Connector.stop()\n\n### taskClass()\n\n public Class\u003c? extends Task\u003e taskClass()\n\n**Overrides** \norg.apache.kafka.connect.connector.Connector.taskClass()\n\n### taskConfigs(int maxTasks)\n\n public List\u003cMap\u003cString,String\u003e\u003e taskConfigs(int maxTasks)\n\n**Overrides** \norg.apache.kafka.connect.connector.Connector.taskConfigs(int)\n\n### verifySubscription(String cpsProject, String cpsSubscription, CredentialsProvider credentialsProvider)\n\n public void verifySubscription(String cpsProject, String cpsSubscription, CredentialsProvider credentialsProvider)\n\nCheck whether the user provided Cloud Pub/Sub subscription name specified by [#CPS_SUBSCRIPTION_CONFIG](/java/docs/reference/pubsub-group-kafka-connector/latest/com.google.pubsub.kafka.source.CloudPubSubSourceConnector#com_google_pubsub_kafka_source_CloudPubSubSourceConnector_CPS_SUBSCRIPTION_CONFIG) exists or not.\n\n### version()\n\n public String version()"]]