Execute uma tarefa do Dataflow num contentor personalizado

Este documento descreve como executar um pipeline do Dataflow através de um contentor personalizado.

Para informações sobre como criar a imagem do contentor, consulte o artigo Crie imagens de contentores personalizadas para o Dataflow.

Quando executar o pipeline, inicie-o com o Apache Beam SDK com a mesma versão e versão do idioma que o SDK na imagem do contentor personalizado. Este passo evita erros inesperados de dependências ou SDKs incompatíveis.

Teste localmente

Antes de executar o pipeline no Dataflow, é recomendável testar a imagem do contentor localmente, o que permite testes e depuração mais rápidos.

Para saber mais sobre a utilização específica do Apache Beam, consulte o guia do Apache Beam para Executar pipelines com imagens de contentores personalizados.

Testes básicos com o PortableRunner

Para verificar se é possível obter imagens de contentores remotos e executar um pipeline simples, use o Apache Beam PortableRunner. Quando usa o comando PortableRunner, o envio de tarefas ocorre no ambiente local e a execução DoFn acontece no ambiente Docker.

Quando usa GPUs, o contentor Docker pode não ter acesso às GPUs. Para testar o contentor com GPUs, use o executador direto e siga os passos para testar uma imagem de contentor numa VM autónoma com GPUs na secção Depurar com uma VM autónoma da página "Usar GPUs".

O seguinte executa um exemplo de pipeline:

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Go

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Substitua o seguinte:

  • REGION: a região do serviço de tarefas a usar, no formato de endereço e porta. Por exemplo: localhost:3000. Use embed para executar um serviço de tarefas no processo.
  • IMAGE_URI: o URI da imagem do contentor personalizado.
  • INPUT_FILE: um ficheiro de entrada que pode ser lido como um ficheiro de texto. Este ficheiro tem de estar acessível através da plataforma de teste do SDK
    da imagem do contentor, pré-carregado na imagem do contentor ou num ficheiro remoto.
  • OUTPUT_FILE: um caminho para escrever a saída. Este caminho é um caminho remoto ou um caminho local no contentor.

Quando o pipeline for concluído com êxito, reveja os registos da consola para verificar se o pipeline foi concluído com êxito e se a imagem remota, especificada por IMAGE_URI, é usada.

Após a execução do pipeline, os ficheiros guardados no contentor não se encontram no seu sistema de ficheiros local, e o contentor é parado. Pode copiar ficheiros do sistema de ficheiros do contentor parado através de docker cp.

Em alternativa:

  • Fornecer resultados a um sistema de ficheiros remoto, como o Cloud Storage. Pode ter de configurar manualmente o acesso para fins de teste, incluindo para ficheiros de credenciais ou credenciais predefinidas da aplicação.
  • Para uma depuração rápida, adicione registo temporário.

Use o Direct Runner

Para testes locais mais detalhados da imagem do contentor e do pipeline, use o Direct Runner do Apache Beam.

Pode validar o pipeline separadamente do contentor testando-o num ambiente local que corresponda à imagem do contentor ou iniciando o pipeline num contentor em execução.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Go

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

Substitua IMAGE_URI pelo URI da imagem do contentor personalizado.

Os exemplos pressupõem que todos os ficheiros de pipeline, incluindo o próprio pipeline, estão no contentor personalizado, foram montados a partir de um sistema de ficheiros local ou são remotos e acessíveis pelo Apache Beam e pelo contentor. Por exemplo, para usar o Maven (mvn) para executar o exemplo de Java anterior, o Maven e as respetivas dependências têm de ser preparados no contentor. Para mais informações, consulte as secções Armazenamento e docker run na documentação do Docker.

O objetivo dos testes no Direct Runner é testar o pipeline no ambiente do contentor personalizado e não testar a execução do contentor com o ENTRYPOINT predefinido. Modifique o ENTRYPOINT (por exemplo, docker run --entrypoint ...) para executar diretamente o pipeline ou permitir a execução manual de comandos no contentor.

Se depender de uma configuração específica baseada na execução do contentor no Compute Engine, pode executar o contentor diretamente numa VM do Compute Engine. Para mais informações, consulte o artigo Recipientes no Compute Engine.

Inicie a tarefa do Dataflow

Quando iniciar o pipeline do Apache Beam no Dataflow, especifique o caminho para a imagem do contentor. Não use a etiqueta :latest com as suas imagens personalizadas. Etiquete as suas compilações com uma data ou um identificador único. Se algo correr mal, a utilização deste tipo de etiqueta pode permitir reverter a execução do pipeline para uma configuração de funcionamento conhecida anteriormente e permitir uma inspeção das alterações.

Java

Use --sdkContainerImage para especificar uma imagem de contentor do SDK para o seu tempo de execução Java.

Use --experiments=use_runner_v2 para ativar o Runner v2.

Python

Se estiver a usar a versão 2.30.0 ou posterior do SDK, use a opção de pipeline --sdk_container_image para especificar uma imagem do contentor do SDK.

Para versões anteriores do SDK, use a opção de pipeline --worker_harness_container_image para especificar a localização da imagem do contentor a usar para o worker harness.

Os contentores personalizados só são suportados para o Dataflow Runner v2. Se estiver a iniciar um pipeline Python em lote, defina a flag --experiments=use_runner_v2. Se estiver a iniciar um pipeline Python de streaming, não é necessário especificar a experiência, porque os pipelines Python de streaming usam o Runner v2 por predefinição.

Go

Se usar a versão 2.40.0 ou posterior do SDK, use a opção de pipeline --sdk_container_image para especificar uma imagem do contentor do SDK.

Para versões anteriores do SDK, use a opção de pipeline --worker_harness_container_image para especificar a localização da imagem do contentor a usar para o worker harness.

Os contentores personalizados são suportados em todas as versões do SDK Go porque usam o Dataflow Runner v2 por predefinição.

O exemplo seguinte demonstra como iniciar o exemplo WordCount em lote com um contentor personalizado.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Usar a versão 2.30.0 ou posterior do SDK do Apache Beam para Python:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Go

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

Substitua o seguinte:

  • INPUT_FILE: o caminho de entrada do Cloud Storage lido pelo Dataflow quando executa o exemplo.
  • OUTPUT_FILE: o caminho de saída do Cloud Storage escrito pelo pipeline de exemplo. Este ficheiro contém as contagens de palavras.
  • PROJECT_ID: o ID do seu projeto Google Cloud.
  • REGION: a região onde implementar a sua tarefa do Dataflow.
  • TEMP_LOCATION: o caminho do Cloud Storage para o Dataflow preparar ficheiros de tarefas temporários criados durante a execução do pipeline.
  • DISK_SIZE_GB: opcional. Se o seu contentor for grande, considere aumentar o tamanho do disco de arranque predefinido para evitar ficar sem espaço em disco.
  • IMAGE_URI: o URI da imagem do contentor personalizado do SDK. Use sempre um SHA ou uma etiqueta de contentor com versão. Não use a etiqueta :latest nem uma etiqueta mutável.

Streaming de imagens de contentores

Pode melhorar a latência de início e de escalabilidade automática do pipeline do Dataflow ativando o streaming de imagens. Esta funcionalidade é útil se o seu contentor personalizado contiver conteúdo estranho ou não usar todo o conteúdo em cada passo. Por exemplo, o seu contentor pode conter conteúdo acidental, como código de biblioteca baseado na CPU para inferência baseada na GPU. Da mesma forma, pode ter um contentor que execute pipelines de ML com vários modelos que usam apenas um modelo em cada passo, pelo que o respetivo conteúdo não é necessário de uma só vez. A ativação do streaming de imagens ajuda a melhorar a latência nestes casos.

Java

--dataflowServiceOptions=enable_image_streaming

Python

--dataflow_service_options=enable_image_streaming

Go

--dataflow_service_options=enable_image_streaming

O streaming de imagens obtém partes do seu contentor personalizado à medida que o código do pipeline precisa delas, em vez de transferir todo o contentor antecipadamente. As partes do seu contentor que não são usadas nunca têm de ser transferidas.

Tem de ter a API do sistema de ficheiros de contentores ativada para beneficiar do streaming de imagens.