Confluent kafka client

Confluent kafka client DEFAULT

Confluent's Python Client for Apache KafkaTM

confluent-kafka-python provides a high-level Producer, Consumer and AdminClient compatible with all Apache KafkaTM brokers >= v, Confluent Cloud and the Confluent Platform. The client is:

  • Reliable - It's a wrapper around librdkafka (provided automatically via binary wheels) which is widely deployed in a diverse set of production scenarios. It's tested using the same set of system tests as the Java client and more. It's supported by Confluent.

  • Performant - Performance is a key design consideration. Maximum throughput is on par with the Java client for larger message sizes (where the overhead of the Python interpreter has less impact). Latency is on par with the Java client.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

See the API documentation for more info.

Below are some examples of typical usage. For more examples, see the examples directory or the confluentinc/examples github repo for a Confluent Cloud example.

Producer

fromconfluent_kafkaimportProducerp=Producer({'bootstrap.servers': 'mybroker1,mybroker2'}) defdelivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """iferrisnotNone: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) fordatainsome_data_source: # Trigger any available delivery report callbacks from previous produce() callsp.poll(0) # Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.p.produce('mytopic', data.encode('utf-8'), callback=delivery_report) # Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.p.flush()

High-level Consumer

fromconfluent_kafkaimportConsumerc=Consumer({ 'bootstrap.servers': 'mybroker', 'group.id': 'mygroup', 'auto.offset.reset': 'earliest' }) c.subscribe(['mytopic']) whileTrue: msg=c.poll() ifmsgisNone: continueifmsg.error(): print("Consumer error: {}".format(msg.error())) continueprint('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()

AvroProducer

fromconfluent_kafkaimportavrofromconfluent_kafka.avroimportAvroProducervalue_schema_str="""{ "namespace": "my.test", "name": "value", "type": "record", "fields" : [ { "name" : "name", "type" : "string" } ]}"""key_schema_str="""{ "namespace": "my.test", "name": "key", "type": "record", "fields" : [ { "name" : "name", "type" : "string" } ]}"""value_schema=avro.loads(value_schema_str) key_schema=avro.loads(key_schema_str) value= {"name": "Value"} key= {"name": "Key"} defdelivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """iferrisnotNone: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) avroProducer=AvroProducer({ 'bootstrap.servers': 'mybroker,mybroker2', 'on_delivery': delivery_report, 'schema.registry.url': 'http://schema_registry_host:port' }, default_key_schema=key_schema, default_value_schema=value_schema) avroProducer.produce(topic='my_topic', value=value, key=key) avroProducer.flush()

AvroConsumer

fromconfluent_kafka.avroimportAvroConsumerfromconfluent_kafka.avro.serializerimportSerializerErrorc=AvroConsumer({ 'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://'}) c.subscribe(['my_topic']) whileTrue: try: msg=c.poll(10) exceptSerializerErrorase: print("Message deserialization failed for {}: {}".format(msg, e)) breakifmsgisNone: continueifmsg.error(): print("AvroConsumer error: {}".format(msg.error())) continueprint(msg.value()) c.close()

AdminClient

Create topics:

fromconfluent_kafka.adminimportAdminClient, NewTopica=AdminClient({'bootstrap.servers': 'mybroker'}) new_topics= [NewTopic(topic, num_partitions=3, replication_factor=1) fortopicin ["topic1", "topic2"]] # Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.# Call create_topics to asynchronously create topics. A dict# of <topic,future> is returned.fs=a.create_topics(new_topics) # Wait for each operation to finish.fortopic, finfs.items(): try: f.result() # The result itself is Noneprint("Topic {} created".format(topic)) exceptExceptionase: print("Failed to create topic {}: {}".format(topic, e))

Thread Safety

The , and are all thread safe.

Install self-contained binary wheels

NOTE: The pre-built Linux wheels do NOT contain SASL Kerberos/GSSAPI support. If you need SASL Kerberos/GSSAPI support you must install librdkafka and its dependencies using the repositories below and then build confluent-kafka using the command in the "Install from source from PyPi" section below.

Install AvroProducer and AvroConsumer

Install from source from PyPi(requires librdkafka + dependencies to be installed separately):

For source install, see Prerequisites below.

The Python client (as well as the underlying C library librdkafka) supports all broker versions >= But due to the nature of the Kafka protocol in broker versions and it is not safe for a client to assume what protocol version is actually supported by the broker, thus you will need to hint the Python client what protocol version it may use. This is done through two configuration settings:

  • (default )
  • (default true)

When using a Kafka broker or later you don't need to do anything ( is the default). If you use Kafka broker or you must set and set to your broker version, e.g .

More info here: https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility

If you're connecting to a Kafka cluster through SSL you will need to configure the client with (or if SASL authentication is used).

The client will use CA certificates to verify the broker's certificate. The embedded OpenSSL library will look for CA certificates in or . CA certificates are typically provided by the Linux distribution's package which needs to be installed through , , et.al.

If your system stores CA certificates in another location you will need to configure the client with .

Alternatively, the CA certificates can be provided by the certifi Python package. To use certifi, add an line and configure the client's CA location with .

  • Python >= or Python 3.x
  • librdkafka >= (latest release is embedded in wheels)

librdkafka is embedded in the macosx manylinux wheels, for other platforms, SASL Kerberos/GSSAPI support or when a specific version of librdkafka is desired, following these guidelines:

Apache License v

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-python. confluent-kafka-python has no affiliation with and is not endorsed by The Apache Software Foundation.

Instructions on building and testing confluent-kafka-python can be found here.

Sours: https://github.com/confluentinc/confluent-kafka-python

Clients¶

Clients

Clients make it fast and easy to produce and consume messages through Kafka.

../../_images/header1.svg

Consumer and producer

../../_images/svg

Kafka consumer

Java consumer that is included with Apache Kafka.

../../_images/svg

Kafka producer

Java producer that is included with Apache Kafka.

Clients

../../_images/svg

Kafka Java Client

Docs | API | GitHub

Java producer and consumer shipped with Apache Kafka.

../../_images/svg

Kafka C/C++ Client

Docs | API | GitHub

librdkafka, a C/C++ library that offers a producer and a consumer for Kafka.

../../_images/svg

Kafka Python Client

Docs | API | GitHub

Python client that provides a high-level producer, consumer, and AdminClient.

../../_images/svg

Kafka Go Client

Docs | API | GitHub

Go client that offers a producer and a consumer for Kafka.

../../_images/svg

Kafka .NET Client

Docs | API | GitHub

.NET client that provides a high-level producer, consumer and AdminClient.

../../_images/svg

Confluent JMS Client

Docs

Allows Kafka or Confluent Platform to be used as a JMS message broker.

© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.

Sours: https://docs.confluent.io/home/clients/overview.html
  1. Brown tile home depot
  2. Rain bird sprinklers
  3. Etsy logo transparent background

Kafka Clients¶

Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

This section discusses the Kafka clients included with the Confluent Platform including the Java client, the high-performance C/C++ client called librdkafka, and the Python client confluent-kafka-python.

Tip

You may also want to take a look at Kafka Streams, which is a powerful, easy-to-use library for building highly scalable, fault-tolerant, distributed stream processing applications on top of Kafka.

© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.


Last updated on Aug 26,

Built with Sphinx using a theme provided by Read the Docs.
Sours: https://docs.confluent.io//clients/index.html

Kafka Clients¶

This section describes the clients included with Confluent Platform.

Confluent Platform includes client libraries for multiple languages that provide both low-level access to Apache Kafka® and higher level stream processing.

Feature Support¶

The following tables describes the client support for various Confluent Platform features.

FeatureC/C++GoJava.NETPython
Admin APIYesYesYesYesYes
Control Center metrics integrationYesYesYesYesYes
Custom partitionerYesNoYesNoNo
Exactly Once SemanticsYesYesYesYesYes
Idempotent ProducerYesYesYesYesYes
Kafka StreamsNoNoYesNoNo
Record HeadersYesYesYesYesYes
SASL Kerberos/GSSAPIYesYesYesYesYes
SASL PLAINYesYesYesYesYes
SASL SCRAMYesYesYesYesYes
SASL OAUTHBEARERYesYesYesNoNo
Simplified installationYesYesYesYesYes
Schema RegistryYesNoYesYesYes
Topic Metadata APIYesYesYesYesYes

Other Languages¶

There are many programming languages that provide Kafka client libraries. The following “Hello, World!” examples are written in various languages to demonstrate how to produce to and consume from an Apache Kafka® cluster, which can be in Confluent Cloud, on your local host, or any other Kafka cluster. For the subset of languages that support it, the code examples also demonstrate how to use Avro and Confluent Schema Registry.

There are additional examples for Kafka and Confluent Platform command line tools and components:

../_images/clients-all.png

© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.

Sours: https://docs.confluent.io/platform/current/clients/index.html

Client confluent kafka

Kafka Clients¶

This section describes the clients included with Confluent Platform.

Confluent Platform includes client libraries for multiple languages that provide both low-level access to Apache Kafka® and higher level stream processing.

Feature Support¶

The following tables describes the client support for various Confluent Platform features.

FeatureC/C++GoJava.NETPython
Admin APIYesYesYesYesYes
Control Center metrics integrationYesYesYesYesYes
Custom partitionerYesNoYesNoNo
Exactly Once SemanticsYesYesYesYesYes
Idempotent ProducerYesYesYesYesYes
Kafka StreamsNoNoYesNoNo
Record HeadersYesYesYesYesYes
SASL Kerberos/GSSAPIYesYesYesYesYes
SASL PLAINYesYesYesYesYes
SASL SCRAMYesYesYesYesYes
SASL OAUTHBEARERYesYesYesNoNo
Simplified installationYesYesYesYesYes
Schema RegistryYesNoYesYesYes
Topic Metadata APIYesYesYesYesYes

Other Languages¶

There are many other programming languages that provide Kafka client libraries as well. Refer to Code Examples for client examples written in the following programming languages and tools. These “Hello, World!” examples produce to and consume from any Kafka cluster, and for the subset of languages that support it, there are additional examples using Confluent Cloud Schema Registry and Avro.

../_images/clients-all.png

© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.


Last updated on Oct 10,

Built with Sphinx using a theme provided by Read the Docs.
Sours: https://docs.confluent.io//clients/index.html
6. The Confluent Platform - Apache Kafka® Fundamentals

Kafka .NET Client¶

Confluent develops and maintains confluent-kafka-dotnet, a .NET library that provides a high-level Producer, Consumer and AdminClient compatible with all Kafka brokers >= v, Confluent Cloud and Confluent Platform.

.NET Client Installation¶

confluent-kafka-dotnet is made available via NuGet. It’s a binding to the C client librdkafka, which is provided automatically via the dependent librdkafka.redist package for a number of popular platforms (win-x64, win-x86, debian-x64, rhel-x64 and osx).

To reference confluent-kafka-dotnet from within a Visual Studio project, run the following command in the Package Manager Console:

PM> Install-Package Confluent.Kafka

Note

The dependent librdkafka.redist package will be installed automatically.

To reference confluent-kafka-dotnet in a .NET Core project, execute the following command in your project’s directory:

> dotnet add package Confluent.Kafka

confluent-kafka-dotnet is compatible with the .NET Framework >= v, .NET Core >= v (on Windows, Linux and Mac) and .NET Standard >= v Mono is not supported.

In addition to the package, we provide the and packages for integration with Confluent Schema Registry. For more information, refer to Working With Apache Avro in the repo README file.

.NET Client example code¶

For Hello World examples of Kafka clients in .NET, see .NET. All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud.

Producer¶

To create a .NET Producer, first construct an instance of the strongly typed class, then pass this into the ’s constructor:

using Confluent.Kafka; using System.Net; var config= new ProducerConfig {BootstrapServers="host,host", ClientId= Dns.GetHostName(), }; using (var producer= new ProducerBuilder<Null, string>(config).Build()){ }

To write messages to Kafka, use either the or method.

is very useful in highly concurrent scenarios, for example in ASP.NET request handlers:

await producer.ProduceAsync("weblog", new Message<Null, string> {Value="a log message"});

Unless your application is highly concurrent though, it’s best to avoid synchronous execution like the above as it will reduce maximum throughput enormously.

To asynchronously handle delivery result notifications, you can use :

var t= producer.ProduceAsync("topic", new Message<Null, string> {Value="hello world"}); t.ContinueWith(task=> {if(task.IsFaulted){ }else{ Console.WriteLine($"Wrote to offset: {task.Result.Offset}");}});

Or you can use the method, which takes a delivery handler delegate as a parameter:

public static void handler(DeliveryReport<Null, string>){ } public static process(){ producer.Produce("my-topic", new Message<Null, string> {Value="hello world"}, handler);}

There are a couple of additional benefits of using the method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With , this is not the case because may complete on any thread pool thread. Second, is more performant because there is unavoidable overhead in the higher level based API.

Consumer¶

Initialization¶

To create a .NET Consumer, first construct an instance of the strongly typed class, then pass this into the ’s constructor:

using System.Collections.Generic; using Confluent.Kafka; var config= new ConsumerConfig {BootstrapServers="host,host", GroupId="foo", AutoOffsetReset= AutoOffsetReset.Earliest }; using (var consumer= new ConsumerBuilder<Ignore, string>(config).Build()){ }

The property is mandatory and specifies which consumer group the consumer is a member of. The property specifies what offset the consumer should start reading from in the event there are no committed offsets for a partition, or the committed offset is invalid (perhaps due to log truncation).

The Consume Loop¶

A typical Kafka consumer application is centered around a consume loop, which repeatedly calls the method to retrieve records one-by-one that have been efficiently pre-fetched by the consumer in background threads. Before entering the consume loop, you’ll typically use the method to specify which topics should be fetched from:

using (var consumer= new ConsumerBuilder<Ignore, string>(config).Build()){ consumer.Subscribe(topics);while(!cancelled){ var consumeResult= consumer.Consume(cancellationToken); // handle consumed message. } consumer.Close();}

Note that disposing the consumer instance after you are finished using it (achieved with the block in the above example) will ensure that active sockets are closed and internal state is cleaned up. In order to leave the group cleanly - i.e. commit final offsets and trigger a group rebalance which ensures that any partitions owned by the consumer are re-assigned to other members in the group in a timely fashion - you additionally need to call the method prior to disposing.

Auto Offset Commit¶

By default, the .NET Consumer will commit offsets automatically. This is done periodically by a background thread at an interval specified by the config property. An offset becomes eligible to be committed immediately prior to being delivered to the application via the method.

This strategy introduces the potential for messages to be missed in the case of application failure because the application may terminate before it finishes processing a particular message, whilst the offset corresponding to that message may be successfully committed to Kafka by the background thread.

Furthermore, this strategy may also introduce duplicate processing in the case of application failure since offsets are only committed periodically.

Synchronous Commits¶

The C# client allows you to commit offsets explicitly via the method. In the following example, a synchronous commit is triggered every messages:

var config= new ConsumerConfig { // Disable auto-committing of offsets. EnableAutoCommit=false} while(!cancelled){ var consumeResult= consumer.Consume(cancellationToken); // process message here. if(consumeResult.Offset % commitPeriod==0){ try { consumer.Commit(consumeResult);} catch (KafkaException e){ Console.WriteLine($"Commit error: {e.Error.Reason}");}}}

This approach gives “at least once” delivery semantics since the offset corresponding to a message is only committed after the message has been successfully processed.

If you reverse the order of the processing and commit, as well as commit before every message (not just periodically), you will get “at most once” delivery semantics.

Note

You should generally avoid blocking network calls (including synchronous use of ) because of the ramifications for throughput.

Store Offsets¶

The auto offset commit capability in the .NET Client is actually quite flexible. As outlined above, by default, the offsets to be commited to Kafka are updated immediately prior to the method deliverying messages to the application. However, you can prevent this from happening by setting the config property to . You can then use the method to specify the offsets you would like the background thread to commit, and you can call this precisely when you want. This approach is preferred over the synchronous commit approach outlined in the previous section.

The below example uses this approach to achieve at least once delivery semantics without blocking the main processing loop:

var config= new ConsumerConfig { EnableAutoCommit=true // (the default)EnableAutoOffsetStore=false} while(!cancelled){ var consumeResult= consumer.Consume(cancellationToken); // process message here. consumer.StoreOffset(consumeResult);}

© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.

Sours: https://docs.confluent.io/clients-confluent-kafka-dotnet/current/overview.html

You will also like:

Kafka Python Client¶

Confluent develops and maintains confluent-kafka-python, a Python Client for Apache Kafka® that provides a high-level Producer, Consumer and AdminClient compatible with all Kafka brokers >= v, Confluent Cloud and Confluent Platform.

Python Client installation¶

The client is available on PyPI and can be installed using :

pip install confluent-kafka

You can install it globally, or within a virtualenv.

Note

The confluent-kafka Python package is a binding on top of the C client librdkafka. It comes bundled with a pre-built version of librdkafka which does not include GSSAPI/Kerberos support. For information how to install a version that supports GSSAPI, see the installation instructions.

Python Client demo code¶

For Hello World examples of Kafka clients in Python, see Python. All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud. They also include examples of how to produce and consume Avro data with Schema Registry.

Kafka Producer¶

Initialization¶

The Producer is configured using a dictionary as follows:

fromconfluent_kafkaimportProducerimportsocketconf={'bootstrap.servers':"host,host",'client.id':socket.gethostname()}producer=Producer(conf)

For information on the available configuration properties, refer to the API Documentation.

Asynchronous writes¶

To initiate sending a message to Kafka, call the method, passing in the message value (which may be ) and optionally a key, partition, and callback. The produce call will complete immediately and does not return a value. A will be thrown if the message could not be enqueued due to librdkafka’s local produce queue being full.

producer.produce(topic,key="key",value="value")

To receive notification of delivery success or failure, you can pass a parameter. This can be any callable, for example, a lambda, function, bound method, or callable object. Although the method enqueues message immediately for batching, compression and transmission to broker, no delivery notification events will be propagated until is invoked.

defacked(err,msg):iferrisnotNone:print("Failed to deliver message: %s: %s"%(str(msg),str(err)))else:print("Message produced: %s"%(str(msg)))producer.produce(topic,key="key",value="value",callback=acked)# Wait up to 1 second for events. Callbacks will be invoked during# this method call if the message is acknowledged.producer.poll(1)

Synchronous writes¶

The Python client provides a method which can be used to make writes synchronous. This is typically a bad idea since it effectively limits throughput to the broker round trip time, but may be justified in some cases.

producer.produce(topic,key="key",value="value")producer.flush()

Typically, should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered.

Kafka Consumer¶

Initialization¶

The Consumer is configured using a dictionary as follows:

fromconfluent_kafkaimportConsumerconf={'bootstrap.servers':"host,host",'group.id':"foo",'auto.offset.reset':'smallest'}consumer=Consumer(conf)

The property is mandatory and specifies which consumer group the consumer is a member of. The property specifies what offset the consumer should start reading from in the event there are no committed offsets for a partition, or the committed offset is invalid (perhaps due to log truncation).

This is another example with the configured to in the consumer. The default value is .

fromconfluent_kafkaimportConsumerconf={'bootstrap.servers':'host,host','group.id':"foo",'enable.auto.commit':False,'auto.offset.reset':'earliest'}consumer=Consumer(conf)

For information on the available configuration properties, refer to the API Documentation.

Python Client code examples¶

Basic poll loop¶

A typical Kafka consumer application is centered around a consume loop, which repeatedly calls the method to retrieve records one-by-one that have been efficiently pre-fetched by the consumer in behind the scenes. Before entering the consume loop, you’ll typically use the method to specify which topics should be fetched from:

running=Truedefbasic_consume_loop(consumer,topics):try:consumer.subscribe(topics)whilerunning:msg=consumer.poll(timeout=)ifmsgisNone:continueifmsg.error():ifmsg.error().code()==KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%%%s [%d] reached end at offset %d\n'%(msg.topic(),msg.partition(),msg.offset()))elifmsg.error():raiseKafkaException(msg.error())else:msg_process(msg)finally:# Close down consumer to commit final offsets.consumer.close()defshutdown():running=False

The poll timeout is hard-coded to 1 second. If no records are received before this timeout expires, then will return an empty record set.

Note that you should always call after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired.

Synchronous commits¶

The simplest and most reliable way to manually commit offsets is by setting the parameter to the method call. This method can also accept the mutually exclusive keyword parameters to explicitly list the offsets for each assigned topic partition and which will commit offsets relative to a object returned by .

defconsume_loop(consumer,topics):try:consumer.subscribe(topics)msg_count=0whilerunning:msg=consumer.poll(timeout=)ifmsgisNone:continueifmsg.error():ifmsg.error().code()==KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%%%s [%d] reached end at offset %d\n'%(msg.topic(),msg.partition(),msg.offset()))elifmsg.error():raiseKafkaException(msg.error())else:msg_process(msg)msg_count+=1ifmsg_count%MIN_COMMIT_COUNT==0:consumer.commit(asynchronous=False)finally:# Close down consumer to commit final offsets.consumer.close()

In this example, a synchronous commit is triggered every messages. The flag controls whether this call is asynchronous. You could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly.

Delivery guarantees¶

In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery, but you must be a little careful with the commit failure.

defconsume_loop(consumer,topics):try:consumer.subscribe(topics)whilerunning:msg=consumer.poll(timeout=)ifmsgisNone:continueifmsg.error():ifmsg.error().code()==KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%%%s [%d] reached end at offset %d\n'%(msg.topic(),msg.partition(),msg.offset()))elifmsg.error():raiseKafkaException(msg.error())else:consumer.commit(asynchronous=False)msg_process(msg)finally:# Close down consumer to commit final offsets.consumer.close()

For simplicity in this example, is used prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded.

Asynchronous Commits¶

defconsume_loop(consumer,topics):try:consumer.subscribe(topics)msg_count=0whilerunning:msg=consumer.poll(timeout=)ifmsgisNone:continueifmsg.error():ifmsg.error().code()==KafkaError._PARTITION_EOF:# End of partition eventsys.stderr.write('%%%s [%d] reached end at offset %d\n'%(msg.topic(),msg.partition(),msg.offset()))elifmsg.error():raiseKafkaException(msg.error())else:msg_process(msg)msg_count+=1ifmsg_count%MIN_COMMIT_COUNT==0:consumer.commit(asynchronous=True)finally:# Close down consumer to commit final offsets.consumer.close()

In this example, the consumer sends the request and returns immediately by using asynchronous commits. The parameter to is changed to . The value is passed in explicitly, but asynchronous commits are the default if the parameter is not included.

The API gives you a callback which is invoked when the commit either succeeds or fails. The commit callback can be any callable and can be passed as a configuration parameter to the consumer constructor.

fromconfluent_kafkaimportConsumerdefcommit_completed(err,partitions):iferr:print(str(err))else:print("Committed partition offsets: "+str(partitions))conf={'bootstrap.servers':"host,host",'group.id':"foo",'default.topic.config':{'auto.offset.reset':'smallest'},'on_commit':commit_completed}consumer=Consumer(conf)

API documentation¶

Click here to view the Python Client API documentation.

© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.

Sours: https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html


38970 38971 38972 38973 38974