Class ConsumerBuilder<TKey, TValue>
A builder class for IConsumer<TKey, TValue>.
Inheritance
Inherited Members
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public class ConsumerBuilder<TKey, TValue>
Type Parameters
Name | Description |
---|---|
TKey | |
TValue |
Constructors
ConsumerBuilder(IEnumerable<KeyValuePair<String, String>>)
Initialize a new ConsumerBuilder instance.
Declaration
public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
Parameters
Type | Name | Description |
---|---|---|
System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<System.String, System.String>> | config | A collection of librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) and parameters specific to this client (refer to: ConfigPropertyNames). At a minimum, 'bootstrap.servers' and 'group.id' must be specified. |
Properties
Config
The config dictionary.
Declaration
protected IEnumerable<KeyValuePair<string, string>> Config { get; set; }
Property Value
Type | Description |
---|---|
System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<System.String, System.String>> |
ErrorHandler
The configured error handler.
Declaration
protected Action<IConsumer<TKey, TValue>, Error> ErrorHandler { get; set; }
Property Value
Type | Description |
---|---|
System.Action<IConsumer<TKey, TValue>, Error> |
KeyDeserializer
The configured key deserializer.
Declaration
protected IDeserializer<TKey> KeyDeserializer { get; set; }
Property Value
Type | Description |
---|---|
IDeserializer<TKey> |
LogHandler
The configured log handler.
Declaration
protected Action<IConsumer<TKey, TValue>, LogMessage> LogHandler { get; set; }
Property Value
Type | Description |
---|---|
System.Action<IConsumer<TKey, TValue>, LogMessage> |
OAuthBearerTokenRefreshHandler
The configured OAuthBearer Token Refresh handler.
Declaration
protected Action<IConsumer<TKey, TValue>, string> OAuthBearerTokenRefreshHandler { get; set; }
Property Value
Type | Description |
---|---|
System.Action<IConsumer<TKey, TValue>, System.String> |
OffsetsCommittedHandler
The configured offsets committed handler.
Declaration
protected Action<IConsumer<TKey, TValue>, CommittedOffsets> OffsetsCommittedHandler { get; set; }
Property Value
Type | Description |
---|---|
System.Action<IConsumer<TKey, TValue>, CommittedOffsets> |
PartitionsAssignedHandler
The configured partitions assigned handler.
Declaration
protected Func<IConsumer<TKey, TValue>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>> PartitionsAssignedHandler { get; set; }
Property Value
Type | Description |
---|---|
System.Func<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartition>, System.Collections.Generic.IEnumerable<TopicPartitionOffset>> |
PartitionsRevokedHandler
The configured partitions revoked handler.
Declaration
protected Func<IConsumer<TKey, TValue>, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> PartitionsRevokedHandler { get; set; }
Property Value
Type | Description |
---|---|
System.Func<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartitionOffset>, System.Collections.Generic.IEnumerable<TopicPartitionOffset>> |
StatisticsHandler
The configured statistics handler.
Declaration
protected Action<IConsumer<TKey, TValue>, string> StatisticsHandler { get; set; }
Property Value
Type | Description |
---|---|
System.Action<IConsumer<TKey, TValue>, System.String> |
ValueDeserializer
The configured value deserializer.
Declaration
protected IDeserializer<TValue> ValueDeserializer { get; set; }
Property Value
Type | Description |
---|---|
IDeserializer<TValue> |
Methods
Build()
Build a new IConsumer implementation instance.
Declaration
public virtual IConsumer<TKey, TValue> Build()
Returns
Type | Description |
---|---|
IConsumer<TKey, TValue> |
SetErrorHandler(Action<IConsumer<TKey, TValue>, Error>)
Set the handler to call on error events e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors that are not marked as fatal. Non-fatal errors should be interpreted as informational rather than catastrophic.
Declaration
public ConsumerBuilder<TKey, TValue> SetErrorHandler(Action<IConsumer<TKey, TValue>, Error> errorHandler)
Parameters
Type | Name | Description |
---|---|---|
System.Action<IConsumer<TKey, TValue>, Error> | errorHandler |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
Executes as a side-effect of the Consume method (on the same thread).
Exceptions: Any exception thrown by your error handler will be silently ignored.
SetKeyDeserializer(IDeserializer<TKey>)
Set the deserializer to use to deserialize keys.
Declaration
public ConsumerBuilder<TKey, TValue> SetKeyDeserializer(IDeserializer<TKey> deserializer)
Parameters
Type | Name | Description |
---|---|---|
IDeserializer<TKey> | deserializer |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
If your key deserializer throws an exception, this will be wrapped in a ConsumeException with ErrorCode Local_KeyDeserialization and thrown by the initiating call to Consume.
SetLogHandler(Action<IConsumer<TKey, TValue>, LogMessage>)
Set the handler to call when there is information available to be logged. If not specified, a default callback that writes to stderr will be used.
Declaration
public ConsumerBuilder<TKey, TValue> SetLogHandler(Action<IConsumer<TKey, TValue>, LogMessage> logHandler)
Parameters
Type | Name | Description |
---|---|---|
System.Action<IConsumer<TKey, TValue>, LogMessage> | logHandler |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
By default not many log messages are generated.
For more verbose logging, specify one or more debug contexts using the 'Debug' configuration property.
Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.
Exceptions: Any exception thrown by your log handler will be silently ignored.
SetOAuthBearerTokenRefreshHandler(Action<IConsumer<TKey, TValue>, String>)
Set SASL/OAUTHBEARER token refresh callback in provided conf object. The SASL/OAUTHBEARER token refresh callback is triggered via Consume(Int32) (or any of its overloads) whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved, typically based on the configuration defined in sasl.oauthbearer.config. The callback should invoke OAuthBearerSetToken(IClient, String, Int64, String, IDictionary<String, String>) or OAuthBearerSetTokenFailure(IClient, String) to indicate success or failure, respectively.
An unsecured JWT refresh handler is provided by librdkafka for development and testing purposes, it is enabled by setting the enable.sasl.oauthbearer.unsecure.jwt property to true and is mutually exclusive to using a refresh callback.
Declaration
public ConsumerBuilder<TKey, TValue> SetOAuthBearerTokenRefreshHandler(Action<IConsumer<TKey, TValue>, string> oAuthBearerTokenRefreshHandler)
Parameters
Type | Name | Description |
---|---|---|
System.Action<IConsumer<TKey, TValue>, System.String> | oAuthBearerTokenRefreshHandler | the callback to set; callback function arguments: IConsumer - instance of the consumer which should be used to set token or token failure string - Value of configuration property sasl.oauthbearer.config |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
SetOffsetsCommittedHandler(Action<IConsumer<TKey, TValue>, CommittedOffsets>)
A handler that is called to report the result of (automatic) offset commits. It is not called as a result of the use of the Commit method.
Declaration
public ConsumerBuilder<TKey, TValue> SetOffsetsCommittedHandler(Action<IConsumer<TKey, TValue>, CommittedOffsets> offsetsCommittedHandler)
Parameters
Type | Name | Description |
---|---|---|
System.Action<IConsumer<TKey, TValue>, CommittedOffsets> | offsetsCommittedHandler |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
Executes as a side-effect of the Consumer.Consume call (on the same thread).
Exceptions: Any exception thrown by your offsets committed handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume/Close.
SetPartitionsAssignedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartition>>)
This handler is called when a new consumer group partition assignment has been received by this consumer.
Note: corresponding to every call to this handler there will be a corresponding call to the partitions revoked handler (if one has been set using SetPartitionsRevokedHandler").
Consumption will resume from the last committed offset for each partition, or if there is
no committed offset, in accordance with the auto.offset.reset
configuration property.
Declaration
public ConsumerBuilder<TKey, TValue> SetPartitionsAssignedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartition>> partitionAssignmentHandler)
Parameters
Type | Name | Description |
---|---|---|
System.Action<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartition>> | partitionAssignmentHandler |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
May execute as a side-effect of the Consumer.Consume call (on the same thread).
Assign/Unassign must not be called in the handler.
Exceptions: Any exception thrown by your partitions assigned handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume.
SetPartitionsAssignedHandler(Func<IConsumer<TKey, TValue>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>)
This handler is called when a new consumer group partition assignment has been received by this consumer.
Note: corresponding to every call to this handler there will be a corresponding call to the partitions revoked handler (if one has been set using SetPartitionsRevokedHandler).
The actual partitions to consume from and start offsets are specified by the return value
of the handler. This set of partitions is not required to match the assignment provided
by the consumer group, but typically will. Partition offsets may be a specific offset, or
special value (Beginning, End or Unset). If Unset, consumption will resume from the
last committed offset for each partition, or if there is no committed offset, in accordance
with the auto.offset.reset
configuration property.
Declaration
public ConsumerBuilder<TKey, TValue> SetPartitionsAssignedHandler(Func<IConsumer<TKey, TValue>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>> partitionsAssignedHandler)
Parameters
Type | Name | Description |
---|---|---|
System.Func<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartition>, System.Collections.Generic.IEnumerable<TopicPartitionOffset>> | partitionsAssignedHandler |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
May execute as a side-effect of the Consumer.Consume call (on the same thread).
Assign/Unassign must not be called in the handler.
Exceptions: Any exception thrown by your partitions assigned handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume.
SetPartitionsRevokedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>>)
This handler is called immediately prior to a group partition assignment being revoked. The second parameter provides the set of partitions the consumer is currently assigned to, and the current position of the consumer on each of these partitions.
The return value of the handler specifies the partitions/offsets the consumer should be assigned to following completion of this method (typically empty).
Declaration
public ConsumerBuilder<TKey, TValue> SetPartitionsRevokedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> partitionsRevokedHandler)
Parameters
Type | Name | Description |
---|---|---|
System.Action<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartitionOffset>> | partitionsRevokedHandler |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
May execute as a side-effect of the Consumer.Consume call (on the same thread).
Assign/Unassign must not be called in the handler.
Exceptions: Any exception thrown by your partitions revoked handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume.
SetPartitionsRevokedHandler(Func<IConsumer<TKey, TValue>, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>)
This handler is called immediately prior to a group partition assignment being revoked. The second parameter provides the set of partitions the consumer is currently assigned to, and the current position of the consumer on each of these partitions.
Declaration
public ConsumerBuilder<TKey, TValue> SetPartitionsRevokedHandler(Func<IConsumer<TKey, TValue>, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> partitionsRevokedHandler)
Parameters
Type | Name | Description |
---|---|---|
System.Func<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartitionOffset>, System.Collections.Generic.IEnumerable<TopicPartitionOffset>> | partitionsRevokedHandler |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
May execute as a side-effect of the Consumer.Consume call (on the same thread).
Assign/Unassign must not be called in the handler.
Exceptions: Any exception thrown by your partitions revoked handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume/Close.
SetStatisticsHandler(Action<IConsumer<TKey, TValue>, String>)
Set the handler to call on statistics events. Statistics are provided as a JSON formatted string as defined here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
Declaration
public ConsumerBuilder<TKey, TValue> SetStatisticsHandler(Action<IConsumer<TKey, TValue>, string> statisticsHandler)
Parameters
Type | Name | Description |
---|---|---|
System.Action<IConsumer<TKey, TValue>, System.String> | statisticsHandler |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
You can enable statistics and set the statistics interval using the StatisticsIntervalMs configuration property (disabled by default).
Executes as a side-effect of the Consume method (on the same thread).
Exceptions: Any exception thrown by your statistics handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume.
SetValueDeserializer(IDeserializer<TValue>)
Set the deserializer to use to deserialize values.
Declaration
public ConsumerBuilder<TKey, TValue> SetValueDeserializer(IDeserializer<TValue> deserializer)
Parameters
Type | Name | Description |
---|---|---|
IDeserializer<TValue> | deserializer |
Returns
Type | Description |
---|---|
ConsumerBuilder<TKey, TValue> |
Remarks
If your value deserializer throws an exception, this will be wrapped in a ConsumeException with ErrorCode Local_ValueDeserialization and thrown by the initiating call to Consume.