Este documento descreve como escrever dados do Dataflow para o Apache Kafka.
Para a maioria dos exemplos de utilização, considere usar o conector de E/S gerido para escrever no Kafka.
Se precisar de uma otimização do desempenho mais avançada, considere usar o conector KafkaIO
. O conetor KafkaIO
está disponível para
Java
ou através da utilização da
estrutura de pipelines multilingues
para Python
e Go.
Processamento exatamente uma vez
Por predefinição, o conetor KafkaIO
não fornece semântica exatamente uma vez para gravações. Isto significa que os dados podem ser escritos no seu tópico do Kafka várias vezes. Para ativar as escritas exatamente uma vez, chame o método withEOS
. As escritas exatamente uma vez garantem que os dados são escritos no tópico do Kafka de destino exatamente uma vez.
No entanto, também aumenta o custo do pipeline e diminui o débito.
Se não tiver requisitos rigorosos para a semântica exatamente uma vez e a lógica no seu pipeline conseguir processar registos duplicados, considere ativar o modo pelo menos uma vez para todo o pipeline de modo a reduzir os custos. Para mais informações, consulte o artigo Defina o modo de streaming do pipeline.
Drenos de condutas
Se esvaziar o pipeline, a semântica de exatamente uma vez não é garantida. A única garantia é que não se perdem dados confirmados. Como resultado, alguns dados podem ser processados enquanto o pipeline está a ser esvaziado, sem a confirmação das compensações de leitura de volta para o Kafka. Para alcançar a semântica de pelo menos uma vez para o Kafka quando modifica um pipeline, atualize o pipeline em vez de cancelar a tarefa e iniciar uma nova tarefa.
Ajuste o Kafka para uma semântica exatamente uma vez
Ajustar transaction.max.timeout.ms
e transactional.id.expiration.ms
pode complementar a sua estratégia geral de tolerância a falhas e de entrega exatamente uma vez.
No entanto, o respetivo impacto depende da natureza da indisponibilidade e da sua configuração específica. Defina transaction.max.timeout.ms
perto do tempo de retenção dos tópicos do Kafka para evitar a duplicação de dados causada por indisponibilidades do agente do Kafka.
Se um agente Kafka ficar temporariamente indisponível (por exemplo, devido a uma partição de rede ou a uma falha do nó) e um produtor tiver transações em curso, essas transações podem exceder o tempo limite. Aumentar o valor de
transaction.max.timeout.ms
dá às transações mais tempo para serem concluídas depois de um
agente ser recuperado, o que pode evitar a necessidade de reiniciar as transações e
reenviar mensagens. Esta mitigação ajuda indiretamente a manter a semântica de exatamente uma vez, reduzindo a probabilidade de mensagens duplicadas causadas por reinícios de transações. Por outro lado, um tempo de expiração mais curto pode ajudar a limpar os IDs transacionais inativos mais rapidamente, reduzindo a potencial utilização de recursos.
Configure a rede
Por predefinição, o Dataflow inicia instâncias na sua rede da nuvem virtual privada (VPC) predefinida. Consoante a configuração do Kafka, pode ter de configurar uma rede e uma sub-rede diferentes para o Dataflow. Para mais informações, consulte o artigo Especifique uma rede e uma sub-rede. Quando configurar a sua rede, crie regras de firewall que permitam que as máquinas de trabalho do Dataflow alcancem os agentes Kafka.
Se estiver a usar os VPC Service Controls, coloque o cluster Kafka dentro do perímetro dos VPC Service Controls ou, caso contrário, estenda os perímetros à VPN ou ao Cloud Interconnect autorizados.
Se o cluster do Kafka estiver implementado fora do Google Cloud, tem de criar uma ligação de rede entre o Dataflow e o cluster do Kafka. Existem várias opções de rede com diferentes compromissos:
- Estabeleça ligação através de um espaço de endereço RFC 1918 partilhado, usando uma das seguintes opções:
- Alcance o seu cluster Kafka alojado externamente através de endereços IP públicos usando uma das seguintes opções:
- Internet pública
- Intercâmbio direto
- Intercâmbio por operadora
O Dedicated Interconnect é a melhor opção para um desempenho e uma fiabilidade previsíveis, mas a configuração pode demorar mais tempo porque terceiros têm de aprovisionar os novos circuitos. Com uma topologia baseada em IPs públicos, pode começar rapidamente porque não é necessário fazer muito trabalho de rede.
As duas secções seguintes descrevem estas opções mais detalhadamente.
Espaço de endereço RFC 1918 partilhado
O Dedicated Interconnect e a VPN IPsec dão-lhe acesso direto a endereços IP RFC 1918 na sua nuvem privada virtual (VPC), o que pode simplificar a configuração do Kafka. Se estiver a usar uma topologia baseada em VPN, considere configurar uma VPN de débito elevado.
Por predefinição, o Dataflow inicia instâncias na sua rede VPC predefinida. Numa topologia de rede privada com
rotas definidas explicitamente no Cloud Router
que ligam sub-redes no Google Cloud a esse cluster do Kafka, precisa de
mais controlo sobre a localização das suas instâncias do Dataflow. Pode usar o Dataflow para configurar os network
e os subnetwork
parâmetros de execução.
Certifique-se de que a sub-rede correspondente tem endereços IP suficientes disponíveis para o Dataflow iniciar instâncias quando tenta aumentar a escala. Além disso, quando criar uma rede separada para lançar as suas instâncias do Dataflow, certifique-se de que tem uma regra de firewall que ativa o tráfego TCP entre todas as máquinas virtuais no projeto. A rede predefinida já tem esta regra de firewall configurada.
Espaço de endereços IP públicos
Esta arquitetura usa o Transport Layer Security (TLS) para proteger o tráfego entre clientes externos e o Kafka, e usa tráfego não encriptado para a comunicação entre agentes. Quando o ouvinte do Kafka se associa a uma interface de rede usada para comunicação interna e externa, a configuração do ouvinte é simples. No entanto, em muitos cenários, os endereços anunciados externamente
dos agentes Kafka no cluster diferem das interfaces de rede internas
que o Kafka usa. Nesses cenários, pode usar a propriedade advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Os clientes externos estabelecem ligação através da porta 9093 através de um canal "SSL" e os clientes internos estabelecem ligação através da porta 9092 através de um canal de texto simples. Quando especificar um endereço em advertised.listeners
, use nomes DNS (kafkabroker-n.mydomain.com
, neste exemplo) que sejam resolvidos para a mesma instância para tráfego externo e interno. A utilização de endereços IP públicos pode não funcionar, uma vez que os endereços podem não ser resolvidos para tráfego interno.
Registo
O registo de KafkaIO
pode ser bastante detalhado. Considere reduzir o nível de registo na produção da seguinte forma:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Para mais informações, consulte o artigo Defina os níveis de registo do trabalhador do pipeline.
O que se segue?
- Ler a partir do Apache Kafka.
- Saiba mais sobre a E/S gerida.