Este documento descreve como executar um pipeline do Dataflow através de um contentor personalizado.
Para informações sobre como criar a imagem do contentor, consulte o artigo Crie imagens de contentores personalizadas para o Dataflow.
Quando executar o pipeline, inicie-o com o Apache Beam SDK com a mesma versão e versão do idioma que o SDK na imagem do contentor personalizado. Este passo evita erros inesperados de dependências ou SDKs incompatíveis.
Teste localmente
Antes de executar o pipeline no Dataflow, é recomendável testar a imagem do contentor localmente, o que permite testes e depuração mais rápidos.
Para saber mais sobre a utilização específica do Apache Beam, consulte o guia do Apache Beam para Executar pipelines com imagens de contentores personalizados.
Testes básicos com o PortableRunner
Para verificar se é possível obter imagens de contentores remotos e executar um pipeline simples, use o Apache Beam PortableRunner
. Quando usa o comando
PortableRunner
, o envio de tarefas ocorre no ambiente local e a execução
DoFn
acontece no ambiente Docker.
Quando usa GPUs, o contentor Docker pode não ter acesso às GPUs. Para testar o contentor com GPUs, use o executador direto e siga os passos para testar uma imagem de contentor numa VM autónoma com GPUs na secção Depurar com uma VM autónoma da página "Usar GPUs".
O seguinte executa um exemplo de pipeline:
Java
mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
-Dexec.args="--runner=PortableRunner \
--jobEndpoint=REGION \
--defaultEnvironmentType=DOCKER \
--defaultEnvironmentConfig=IMAGE_URI \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE"
Python
python path/to/my/pipeline.py \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Go
go path/to/my/pipeline.go \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Substitua o seguinte:
REGION
: a região do serviço de tarefas a usar, no formato de endereço e porta. Por exemplo:localhost:3000
. Useembed
para executar um serviço de tarefas no processo.IMAGE_URI
: o URI da imagem do contentor personalizado.INPUT_FILE
: um ficheiro de entrada que pode ser lido como um ficheiro de texto. Este ficheiro tem de estar acessível através da plataforma de teste do SDK
da imagem do contentor, pré-carregado na imagem do contentor ou num ficheiro remoto.OUTPUT_FILE
: um caminho para escrever a saída. Este caminho é um caminho remoto ou um caminho local no contentor.
Quando o pipeline for concluído com êxito, reveja os registos da consola para verificar se o pipeline foi concluído com êxito e se a imagem remota, especificada por IMAGE_URI
, é usada.
Após a execução do pipeline, os ficheiros guardados no contentor não se encontram no seu sistema de ficheiros local, e o contentor é parado. Pode copiar ficheiros do sistema de ficheiros do contentor parado através de docker cp
.
Em alternativa:
- Fornecer resultados a um sistema de ficheiros remoto, como o Cloud Storage. Pode ter de configurar manualmente o acesso para fins de teste, incluindo para ficheiros de credenciais ou credenciais predefinidas da aplicação.
- Para uma depuração rápida, adicione registo temporário.
Use o Direct Runner
Para testes locais mais detalhados da imagem do contentor e do pipeline, use o Direct Runner do Apache Beam.
Pode validar o pipeline separadamente do contentor testando-o num ambiente local que corresponda à imagem do contentor ou iniciando o pipeline num contentor em execução.
Java
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...
Python
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# python path/to/my/pipeline.py ...
Go
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# go path/to/my/pipeline.go ...
Substitua IMAGE_URI
pelo URI da imagem do contentor personalizado.
Os exemplos pressupõem que todos os ficheiros de pipeline, incluindo o próprio pipeline, estão no contentor personalizado, foram montados a partir de um sistema de ficheiros local ou são remotos e acessíveis pelo Apache Beam e pelo contentor. Por exemplo, para usar o
Maven (mvn
) para executar o exemplo de Java anterior, o Maven e as respetivas dependências têm de ser preparados no contentor. Para mais informações, consulte as secções
Armazenamento e
docker run
na documentação do Docker.
O objetivo dos testes no Direct Runner é testar o pipeline
no ambiente do contentor personalizado e não testar a execução do contentor
com o ENTRYPOINT
predefinido. Modifique o ENTRYPOINT
(por exemplo, docker run --entrypoint ...
) para executar diretamente o pipeline ou permitir a execução manual de comandos no contentor.
Se depender de uma configuração específica baseada na execução do contentor no Compute Engine, pode executar o contentor diretamente numa VM do Compute Engine. Para mais informações, consulte o artigo Recipientes no Compute Engine.
Inicie a tarefa do Dataflow
Quando iniciar o pipeline do Apache Beam no Dataflow, especifique o caminho para a imagem do contentor. Não use a etiqueta :latest
com as suas imagens personalizadas. Etiquete as suas compilações com uma data ou um identificador único. Se algo correr mal, a utilização deste tipo de etiqueta pode permitir reverter a execução do pipeline para uma configuração de funcionamento conhecida anteriormente e permitir uma inspeção das alterações.
Java
Use --sdkContainerImage
para especificar uma imagem de contentor do SDK para o seu tempo de execução Java.
Use --experiments=use_runner_v2
para ativar o Runner v2.
Python
Se estiver a usar a versão 2.30.0 ou posterior do SDK, use a opção de pipeline --sdk_container_image
para especificar uma imagem do contentor do SDK.
Para versões anteriores do SDK, use a opção de pipeline --worker_harness_container_image
para especificar a localização da imagem do contentor a usar para o worker harness.
Os contentores personalizados só são suportados para o Dataflow Runner v2. Se
estiver a iniciar um pipeline Python em lote, defina a flag --experiments=use_runner_v2
.
Se estiver a iniciar um pipeline Python de streaming, não é necessário especificar a experiência, porque os pipelines Python de streaming usam o Runner v2 por predefinição.
Go
Se usar a versão 2.40.0 ou posterior do SDK, use a opção de pipeline --sdk_container_image
para especificar uma imagem do contentor do SDK.
Para versões anteriores do SDK, use a opção de pipeline --worker_harness_container_image
para especificar a localização da imagem do contentor a usar para o worker harness.
Os contentores personalizados são suportados em todas as versões do SDK Go porque usam o Dataflow Runner v2 por predefinição.
O exemplo seguinte demonstra como iniciar o exemplo
WordCount
em lote
com um contentor personalizado.
Java
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--gcpTempLocation=TEMP_LOCATION \
--diskSizeGb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdkContainerImage=IMAGE_URI"
Python
Usar a versão 2.30.0 ou posterior do SDK do Apache Beam para Python:
python -m apache_beam.examples.wordcount \
--input=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--temp_location=TEMP_LOCATION \
--runner=DataflowRunner \
--disk_size_gb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdk_container_image=IMAGE_URI
Go
wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--sdk_container_image=IMAGE_URI
Substitua o seguinte:
INPUT_FILE
: o caminho de entrada do Cloud Storage lido pelo Dataflow quando executa o exemplo.OUTPUT_FILE
: o caminho de saída do Cloud Storage escrito pelo pipeline de exemplo. Este ficheiro contém as contagens de palavras.PROJECT_ID
: o ID do seu projeto Google Cloud.REGION
: a região onde implementar a sua tarefa do Dataflow.TEMP_LOCATION
: o caminho do Cloud Storage para o Dataflow preparar ficheiros de tarefas temporários criados durante a execução do pipeline.DISK_SIZE_GB
: opcional. Se o seu contentor for grande, considere aumentar o tamanho do disco de arranque predefinido para evitar ficar sem espaço em disco.IMAGE_URI
: o URI da imagem do contentor personalizado do SDK. Use sempre um SHA ou uma etiqueta de contentor com versão. Não use a etiqueta:latest
nem uma etiqueta mutável.
Streaming de imagens de contentores
Pode melhorar a latência de início e de escalabilidade automática do pipeline do Dataflow ativando o streaming de imagens. Esta funcionalidade é útil se o seu contentor personalizado contiver conteúdo estranho ou não usar todo o conteúdo em cada passo. Por exemplo, o seu contentor pode conter conteúdo acidental, como código de biblioteca baseado na CPU para inferência baseada na GPU. Da mesma forma, pode ter um contentor que execute pipelines de ML com vários modelos que usam apenas um modelo em cada passo, pelo que o respetivo conteúdo não é necessário de uma só vez. A ativação do streaming de imagens ajuda a melhorar a latência nestes casos.
Java
--dataflowServiceOptions=enable_image_streaming
Python
--dataflow_service_options=enable_image_streaming
Go
--dataflow_service_options=enable_image_streaming
O streaming de imagens obtém partes do seu contentor personalizado à medida que o código do pipeline precisa delas, em vez de transferir todo o contentor antecipadamente. As partes do seu contentor que não são usadas nunca têm de ser transferidas.
Tem de ter a API do sistema de ficheiros de contentores ativada para beneficiar do streaming de imagens.