Ce document explique comment écrire des données textuelles depuis Dataflow vers Cloud Storage à l'aide du Connecteur d'E/S PubSubIO
d'Apache Beam.
Présentation
Pour écrire des données dans Pub/Sub, utilisez le connecteur PubSubIO
. Les éléments d'entrée peuvent être des messages Pub/Sub ou simplement les données des messages.
Si les éléments d'entrée sont des messages Pub/Sub, vous pouvez éventuellement définir des attributs ou une clé de tri sur chaque message.
Vous pouvez utiliser la version Java, Python ou Go du connecteur PubSubIO
, comme suit :
Java
Pour écrire dans un seul sujet, appelez la méthode PubsubIO.writeMessages
. Cette méthode utilise une collection d'objets PubsubMessage
en entrée. Le connecteur définit également des méthodes pratiques pour écrire des chaînes, des messages Avro encodés au format binaire ou des messages Protobuf encodés au format binaire. Ces méthodes convertissent la collection d'entrée en messages Pub/Sub.
Pour écrire dans un ensemble dynamique de thèmes en fonction des données d'entrée, appelez writeMessagesDynamic
. Spécifiez le sujet de destination pour chaque message en appelant PubsubMessage.withTopic
sur le message. Par exemple, vous pouvez acheminer les messages vers différents sujets en fonction de la valeur d'un champ spécifique dans vos données d'entrée.
Pour plus d'informations, consultez la documentation de référence de PubsubIO
.
Python
Appelez la méthode pubsub.WriteToPubSub
.
Par défaut, cette méthode utilise une collection d'entrée de type bytes
, qui représente la charge utile du message. Si le paramètre with_attributes
est True
, la méthode utilise une collection d'objets PubsubMessage
.
Pour en savoir plus, consultez la documentation de référence du module pubsub
.
Go
Pour écrire des données dans Pub/Sub, appelez la méthode pubsubio.Write
. Cette méthode utilise une collection d'entrée d'objets PubSubMessage
ou de tranches d'octets contenant les charges utiles des messages.
Pour plus d'informations, consultez la documentation de référence du package pubsubio
.
Pour en savoir plus sur les messages Pub/Sub, consultez Format des messages dans la documentation Pub/Sub.
Horodatages
Pub/Sub définit un code temporel pour chaque message. Ce code temporel représente l'heure à laquelle le message est publié dans Pub/Sub. Dans un scénario de streaming, vous pouvez également vous intéresser à l'horodatage de l'événement, qui correspond à l'heure à laquelle les données du message ont été générées. Vous pouvez utiliser l'horodatage des éléments Apache Beam pour représenter l'heure de l'événement. Les sources qui créent une PCollection
illimitée attribuent souvent un horodatage à chaque nouvel élément, qui correspond à l'heure de l'événement.
Pour Java et Python, le connecteur d'E/S Pub/Sub peut écrire le code temporel de chaque élément en tant qu'attribut de message Pub/Sub. Les consommateurs de messages peuvent utiliser cet attribut pour obtenir le code temporel de l'événement.
Java
Appelez PubsubIO.Write<T>.withTimestampAttribute
et spécifiez le nom de l'attribut.
Python
Spécifiez le paramètre timestamp_attribute
lorsque vous appelez WriteToPubSub
.
Distribution des messages
Dataflow est compatible avec le traitement "exactement une fois" des messages dans un pipeline. Toutefois, le connecteur d'E/S Pub/Sub ne peut pas garantir la distribution de type "exactement une fois" des messages via Pub/Sub.
Pour Java et Python, vous pouvez configurer le connecteur d'E/S Pub/Sub pour qu'il écrive l'ID unique de chaque élément en tant qu'attribut de message. Les consommateurs de messages peuvent ensuite utiliser cet attribut pour dédupliquer les messages.
Java
Appelez PubsubIO.Write<T>.withIdAttribute
et spécifiez le nom de l'attribut.
Python
Spécifiez le paramètre id_label
lorsque vous appelez WriteToPubSub
.
Sortie directe
Si vous activez le mode de traitement en flux continu de type "au moins une fois" dans votre pipeline, le connecteur d'E/S utilise la sortie directe. Dans ce mode, le connecteur ne crée pas de point de contrôle pour les messages, ce qui permet d'écrire plus rapidement. Toutefois, les nouvelles tentatives dans ce mode peuvent entraîner la duplication de messages avec des ID différents, ce qui peut rendre la déduplication des messages plus difficile pour les consommateurs de messages.
Pour les pipelines qui utilisent le mode "exactement une fois", vous pouvez activer la sortie directe en définissant l'option de service streaming_enable_pubsub_direct_output
. La sortie directe réduit la latence en écriture et permet un traitement plus efficace. Envisagez cette option si vos consommateurs de messages peuvent gérer les messages en double avec des ID de message non uniques.
Exemples
L'exemple suivant crée un PCollection
de messages Pub/Sub et les écrit dans un sujet Pub/Sub. Le thème est spécifié comme option de pipeline. Chaque message contient des données de charge utile et un ensemble d'attributs.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Python
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.