librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
31 
50 #include <string>
51 #include <list>
52 #include <vector>
53 #include <cstdlib>
54 #include <cstring>
55 #include <stdint.h>
56 #include <sys/types.h>
57 
58 #ifdef _WIN32
59 #ifndef ssize_t
60 #ifndef _BASETSD_H_
61 #include <basetsd.h>
62 #endif
63 #ifndef _SSIZE_T_DEFINED
64 #define _SSIZE_T_DEFINED
65 typedef SSIZE_T ssize_t;
66 #endif
67 #endif
68 #undef RD_EXPORT
69 #ifdef LIBRDKAFKA_STATICLIB
70 #define RD_EXPORT
71 #else
72 #ifdef LIBRDKAFKACPP_EXPORTS
73 #define RD_EXPORT __declspec(dllexport)
74 #else
75 #define RD_EXPORT __declspec(dllimport)
76 #endif
77 #endif
78 #else
79 #define RD_EXPORT
80 #endif
81 
84 extern "C" {
85  /* Forward declarations */
86  struct rd_kafka_s;
87  struct rd_kafka_topic_s;
88  struct rd_kafka_message_s;
89  struct rd_kafka_conf_s;
90  struct rd_kafka_topic_conf_s;
91 }
92 
93 namespace RdKafka {
94 
114 #define RD_KAFKA_VERSION 0x010502ff
115 
121 RD_EXPORT
122 int version ();
123 
127 RD_EXPORT
128 std::string version_str();
129 
134 RD_EXPORT
135 std::string get_debug_contexts();
136 
146 RD_EXPORT
147 int wait_destroyed(int timeout_ms);
148 
149 
172 enum ErrorCode {
173  /* Internal errors to rdkafka: */
175  ERR__BEGIN = -200,
177  ERR__BAD_MSG = -199,
179  ERR__BAD_COMPRESSION = -198,
181  ERR__DESTROY = -197,
183  ERR__FAIL = -196,
185  ERR__TRANSPORT = -195,
187  ERR__CRIT_SYS_RESOURCE = -194,
189  ERR__RESOLVE = -193,
191  ERR__MSG_TIMED_OUT = -192,
196  ERR__PARTITION_EOF = -191,
198  ERR__UNKNOWN_PARTITION = -190,
200  ERR__FS = -189,
202  ERR__UNKNOWN_TOPIC = -188,
204  ERR__ALL_BROKERS_DOWN = -187,
206  ERR__INVALID_ARG = -186,
208  ERR__TIMED_OUT = -185,
210  ERR__QUEUE_FULL = -184,
212  ERR__ISR_INSUFF = -183,
214  ERR__NODE_UPDATE = -182,
216  ERR__SSL = -181,
218  ERR__WAIT_COORD = -180,
220  ERR__UNKNOWN_GROUP = -179,
222  ERR__IN_PROGRESS = -178,
224  ERR__PREV_IN_PROGRESS = -177,
226  ERR__EXISTING_SUBSCRIPTION = -176,
228  ERR__ASSIGN_PARTITIONS = -175,
230  ERR__REVOKE_PARTITIONS = -174,
232  ERR__CONFLICT = -173,
234  ERR__STATE = -172,
236  ERR__UNKNOWN_PROTOCOL = -171,
238  ERR__NOT_IMPLEMENTED = -170,
240  ERR__AUTHENTICATION = -169,
242  ERR__NO_OFFSET = -168,
244  ERR__OUTDATED = -167,
246  ERR__TIMED_OUT_QUEUE = -166,
248  ERR__UNSUPPORTED_FEATURE = -165,
250  ERR__WAIT_CACHE = -164,
252  ERR__INTR = -163,
254  ERR__KEY_SERIALIZATION = -162,
256  ERR__VALUE_SERIALIZATION = -161,
258  ERR__KEY_DESERIALIZATION = -160,
260  ERR__VALUE_DESERIALIZATION = -159,
262  ERR__PARTIAL = -158,
264  ERR__READ_ONLY = -157,
266  ERR__NOENT = -156,
268  ERR__UNDERFLOW = -155,
270  ERR__INVALID_TYPE = -154,
272  ERR__RETRY = -153,
274  ERR__PURGE_QUEUE = -152,
276  ERR__PURGE_INFLIGHT = -151,
278  ERR__FATAL = -150,
280  ERR__INCONSISTENT = -149,
282  ERR__GAPLESS_GUARANTEE = -148,
284  ERR__MAX_POLL_EXCEEDED = -147,
286  ERR__UNKNOWN_BROKER = -146,
288  ERR__NOT_CONFIGURED = -145,
290  ERR__FENCED = -144,
292  ERR__APPLICATION = -143,
293 
295  ERR__END = -100,
296 
297  /* Kafka broker errors: */
299  ERR_UNKNOWN = -1,
301  ERR_NO_ERROR = 0,
303  ERR_OFFSET_OUT_OF_RANGE = 1,
305  ERR_INVALID_MSG = 2,
307  ERR_UNKNOWN_TOPIC_OR_PART = 3,
309  ERR_INVALID_MSG_SIZE = 4,
311  ERR_LEADER_NOT_AVAILABLE = 5,
313  ERR_NOT_LEADER_FOR_PARTITION = 6,
315  ERR_REQUEST_TIMED_OUT = 7,
317  ERR_BROKER_NOT_AVAILABLE = 8,
319  ERR_REPLICA_NOT_AVAILABLE = 9,
321  ERR_MSG_SIZE_TOO_LARGE = 10,
323  ERR_STALE_CTRL_EPOCH = 11,
325  ERR_OFFSET_METADATA_TOO_LARGE = 12,
327  ERR_NETWORK_EXCEPTION = 13,
329  ERR_COORDINATOR_LOAD_IN_PROGRESS = 14,
331 #define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS
332 
333  ERR_COORDINATOR_NOT_AVAILABLE = 15,
335 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE
336 
337  ERR_NOT_COORDINATOR = 16,
339 #define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR
340 
341  ERR_TOPIC_EXCEPTION = 17,
343  ERR_RECORD_LIST_TOO_LARGE = 18,
345  ERR_NOT_ENOUGH_REPLICAS = 19,
347  ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
349  ERR_INVALID_REQUIRED_ACKS = 21,
351  ERR_ILLEGAL_GENERATION = 22,
353  ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
355  ERR_INVALID_GROUP_ID = 24,
357  ERR_UNKNOWN_MEMBER_ID = 25,
359  ERR_INVALID_SESSION_TIMEOUT = 26,
361  ERR_REBALANCE_IN_PROGRESS = 27,
363  ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
365  ERR_TOPIC_AUTHORIZATION_FAILED = 29,
367  ERR_GROUP_AUTHORIZATION_FAILED = 30,
369  ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
371  ERR_INVALID_TIMESTAMP = 32,
373  ERR_UNSUPPORTED_SASL_MECHANISM = 33,
375  ERR_ILLEGAL_SASL_STATE = 34,
377  ERR_UNSUPPORTED_VERSION = 35,
379  ERR_TOPIC_ALREADY_EXISTS = 36,
381  ERR_INVALID_PARTITIONS = 37,
383  ERR_INVALID_REPLICATION_FACTOR = 38,
385  ERR_INVALID_REPLICA_ASSIGNMENT = 39,
387  ERR_INVALID_CONFIG = 40,
389  ERR_NOT_CONTROLLER = 41,
391  ERR_INVALID_REQUEST = 42,
393  ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
395  ERR_POLICY_VIOLATION = 44,
397  ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
399  ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
401  ERR_INVALID_PRODUCER_EPOCH = 47,
403  ERR_INVALID_TXN_STATE = 48,
406  ERR_INVALID_PRODUCER_ID_MAPPING = 49,
409  ERR_INVALID_TRANSACTION_TIMEOUT = 50,
412  ERR_CONCURRENT_TRANSACTIONS = 51,
416  ERR_TRANSACTION_COORDINATOR_FENCED = 52,
418  ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
420  ERR_SECURITY_DISABLED = 54,
422  ERR_OPERATION_NOT_ATTEMPTED = 55,
424  ERR_KAFKA_STORAGE_ERROR = 56,
426  ERR_LOG_DIR_NOT_FOUND = 57,
428  ERR_SASL_AUTHENTICATION_FAILED = 58,
430  ERR_UNKNOWN_PRODUCER_ID = 59,
432  ERR_REASSIGNMENT_IN_PROGRESS = 60,
434  ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61,
436  ERR_DELEGATION_TOKEN_NOT_FOUND = 62,
438  ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63,
440  ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64,
442  ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65,
444  ERR_DELEGATION_TOKEN_EXPIRED = 66,
446  ERR_INVALID_PRINCIPAL_TYPE = 67,
448  ERR_NON_EMPTY_GROUP = 68,
450  ERR_GROUP_ID_NOT_FOUND = 69,
452  ERR_FETCH_SESSION_ID_NOT_FOUND = 70,
454  ERR_INVALID_FETCH_SESSION_EPOCH = 71,
456  ERR_LISTENER_NOT_FOUND = 72,
458  ERR_TOPIC_DELETION_DISABLED = 73,
460  ERR_FENCED_LEADER_EPOCH = 74,
462  ERR_UNKNOWN_LEADER_EPOCH = 75,
464  ERR_UNSUPPORTED_COMPRESSION_TYPE = 76,
466  ERR_STALE_BROKER_EPOCH = 77,
468  ERR_OFFSET_NOT_AVAILABLE = 78,
470  ERR_MEMBER_ID_REQUIRED = 79,
472  ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80,
474  ERR_GROUP_MAX_SIZE_REACHED = 81,
477  ERR_FENCED_INSTANCE_ID = 82,
479  ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE = 83,
481  ERR_ELECTION_NOT_NEEDED = 84,
483  ERR_NO_REASSIGNMENT_IN_PROGRESS = 85,
485  ERR_GROUP_SUBSCRIBED_TO_TOPIC = 86,
487  ERR_INVALID_RECORD = 87,
489  ERR_UNSTABLE_OFFSET_COMMIT = 88
490 };
491 
492 
496 RD_EXPORT
497 std::string err2str(RdKafka::ErrorCode err);
498 
499 
500 
505 enum CertificateType {
506  CERT_PUBLIC_KEY,
507  CERT_PRIVATE_KEY,
508  CERT_CA,
509  CERT__CNT
510 };
511 
516 enum CertificateEncoding {
517  CERT_ENC_PKCS12,
518  CERT_ENC_DER,
519  CERT_ENC_PEM,
520  CERT_ENC__CNT
521 };
522 
528 /* Forward declarations */
529 class Producer;
530 class Message;
531 class Headers;
532 class Queue;
533 class Event;
534 class Topic;
535 class TopicPartition;
536 class Metadata;
537 class KafkaConsumer;
557 class RD_EXPORT Error {
558  public:
559 
563  static Error *create (ErrorCode code, const std::string *errstr);
564 
565  virtual ~Error () { }
566 
567  /*
568  * Error accessor methods
569  */
570 
574  virtual ErrorCode code () const = 0;
575 
579  virtual std::string name () const = 0;
580 
584  virtual std::string str () const = 0;
585 
590  virtual bool is_fatal () const = 0;
591 
595  virtual bool is_retriable () const = 0;
596 
608  virtual bool txn_requires_abort () const = 0;
609 };
610 
642 class RD_EXPORT DeliveryReportCb {
643  public:
647  virtual void dr_cb (Message &message) = 0;
648 
649  virtual ~DeliveryReportCb() { }
650 };
651 
652 
680 class RD_EXPORT OAuthBearerTokenRefreshCb {
681  public:
685  virtual void oauthbearer_token_refresh_cb (const std::string &oauthbearer_config) = 0;
686 
687  virtual ~OAuthBearerTokenRefreshCb() { }
688 };
689 
690 
698 class RD_EXPORT PartitionerCb {
699  public:
716  virtual int32_t partitioner_cb (const Topic *topic,
717  const std::string *key,
718  int32_t partition_cnt,
719  void *msg_opaque) = 0;
720 
721  virtual ~PartitionerCb() { }
722 };
723 
729  public:
738  virtual int32_t partitioner_cb (const Topic *topic,
739  const void *key,
740  size_t key_len,
741  int32_t partition_cnt,
742  void *msg_opaque) = 0;
743 
744  virtual ~PartitionerKeyPointerCb() { }
745 };
746 
747 
748 
757 class RD_EXPORT EventCb {
758  public:
764  virtual void event_cb (Event &event) = 0;
765 
766  virtual ~EventCb() { }
767 };
768 
769 
773 class RD_EXPORT Event {
774  public:
776  enum Type {
780  EVENT_THROTTLE
781  };
782 
784  enum Severity {
785  EVENT_SEVERITY_EMERG = 0,
786  EVENT_SEVERITY_ALERT = 1,
787  EVENT_SEVERITY_CRITICAL = 2,
788  EVENT_SEVERITY_ERROR = 3,
789  EVENT_SEVERITY_WARNING = 4,
790  EVENT_SEVERITY_NOTICE = 5,
791  EVENT_SEVERITY_INFO = 6,
792  EVENT_SEVERITY_DEBUG = 7
793  };
794 
795  virtual ~Event () { }
796 
797  /*
798  * Event Accessor methods
799  */
800 
805  virtual Type type () const = 0;
806 
811  virtual ErrorCode err () const = 0;
812 
817  virtual Severity severity () const = 0;
818 
823  virtual std::string fac () const = 0;
824 
833  virtual std::string str () const = 0;
834 
839  virtual int throttle_time () const = 0;
840 
845  virtual std::string broker_name () const = 0;
846 
851  virtual int broker_id () const = 0;
852 
853 
859  virtual bool fatal () const = 0;
860 };
861 
862 
863 
867 class RD_EXPORT ConsumeCb {
868  public:
876  virtual void consume_cb (Message &message, void *opaque) = 0;
877 
878  virtual ~ConsumeCb() { }
879 };
880 
881 
885 class RD_EXPORT RebalanceCb {
886 public:
936  virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer,
937  RdKafka::ErrorCode err,
938  std::vector<TopicPartition*>&partitions) = 0;
939 
940  virtual ~RebalanceCb() { }
941 };
942 
943 
947 class RD_EXPORT OffsetCommitCb {
948 public:
964  virtual void offset_commit_cb(RdKafka::ErrorCode err,
965  std::vector<TopicPartition*>&offsets) = 0;
966 
967  virtual ~OffsetCommitCb() { }
968 };
969 
970 
971 
977 class RD_EXPORT SslCertificateVerifyCb {
978 public:
1015  virtual bool ssl_cert_verify_cb (const std::string &broker_name,
1016  int32_t broker_id,
1017  int *x509_error,
1018  int depth,
1019  const char *buf, size_t size,
1020  std::string &errstr) = 0;
1021 
1022  virtual ~SslCertificateVerifyCb() {}
1023 };
1024 
1025 
1030 class RD_EXPORT SocketCb {
1031  public:
1045  virtual int socket_cb (int domain, int type, int protocol) = 0;
1046 
1047  virtual ~SocketCb() { }
1048 };
1049 
1050 
1055 class RD_EXPORT OpenCb {
1056  public:
1068  virtual int open_cb (const std::string &path, int flags, int mode) = 0;
1069 
1070  virtual ~OpenCb() { }
1071 };
1072 
1073 
1094 class RD_EXPORT Conf {
1095  public:
1099  enum ConfType {
1101  CONF_TOPIC
1102  };
1103 
1107  enum ConfResult {
1108  CONF_UNKNOWN = -2,
1109  CONF_INVALID = -1,
1110  CONF_OK = 0
1111  };
1112 
1113 
1117  static Conf *create (ConfType type);
1118 
1119  virtual ~Conf () { }
1120 
1134  virtual Conf::ConfResult set (const std::string &name,
1135  const std::string &value,
1136  std::string &errstr) = 0;
1137 
1139  virtual Conf::ConfResult set (const std::string &name,
1140  DeliveryReportCb *dr_cb,
1141  std::string &errstr) = 0;
1142 
1144  virtual Conf::ConfResult set (const std::string &name,
1145  OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
1146  std::string &errstr) = 0;
1147 
1149  virtual Conf::ConfResult set (const std::string &name,
1150  EventCb *event_cb,
1151  std::string &errstr) = 0;
1152 
1160  virtual Conf::ConfResult set (const std::string &name,
1161  const Conf *topic_conf,
1162  std::string &errstr) = 0;
1163 
1165  virtual Conf::ConfResult set (const std::string &name,
1166  PartitionerCb *partitioner_cb,
1167  std::string &errstr) = 0;
1168 
1170  virtual Conf::ConfResult set (const std::string &name,
1171  PartitionerKeyPointerCb *partitioner_kp_cb,
1172  std::string &errstr) = 0;
1173 
1175  virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
1176  std::string &errstr) = 0;
1177 
1179  virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
1180  std::string &errstr) = 0;
1181 
1183  virtual Conf::ConfResult set (const std::string &name,
1184  RebalanceCb *rebalance_cb,
1185  std::string &errstr) = 0;
1186 
1188  virtual Conf::ConfResult set (const std::string &name,
1189  OffsetCommitCb *offset_commit_cb,
1190  std::string &errstr) = 0;
1191 
1196  virtual Conf::ConfResult set(const std::string &name,
1197  SslCertificateVerifyCb *ssl_cert_verify_cb,
1198  std::string &errstr) = 0;
1199 
1229  virtual Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type,
1230  RdKafka::CertificateEncoding cert_enc,
1231  const void *buffer, size_t size,
1232  std::string &errstr) = 0;
1233 
1245  virtual Conf::ConfResult get(const std::string &name,
1246  std::string &value) const = 0;
1247 
1251  virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
1252 
1256  virtual Conf::ConfResult get(
1257  OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0;
1258 
1262  virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
1263 
1267  virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
1268 
1272  virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
1273 
1277  virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
1278 
1282  virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
1283 
1287  virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
1288 
1292  virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
1293 
1295  virtual Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0;
1296 
1299  virtual std::list<std::string> *dump () = 0;
1300 
1302  virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
1303  std::string &errstr) = 0;
1304 
1321  virtual struct rd_kafka_conf_s *c_ptr_global () = 0;
1322 
1340  virtual struct rd_kafka_topic_conf_s *c_ptr_topic () = 0;
1341 };
1342 
1355 class RD_EXPORT Handle {
1356  public:
1357  virtual ~Handle() { }
1358 
1360  virtual const std::string name () const = 0;
1361 
1370  virtual const std::string memberid () const = 0;
1371 
1372 
1395  virtual int poll (int timeout_ms) = 0;
1396 
1403  virtual int outq_len () = 0;
1404 
1420  virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt,
1421  Metadata **metadatap, int timeout_ms) = 0;
1422 
1423 
1433  virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
1434 
1435 
1445  virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
1446 
1447 
1456  virtual ErrorCode query_watermark_offsets (const std::string &topic,
1457  int32_t partition,
1458  int64_t *low, int64_t *high,
1459  int timeout_ms) = 0;
1460 
1478  virtual ErrorCode get_watermark_offsets (const std::string &topic,
1479  int32_t partition,
1480  int64_t *low, int64_t *high) = 0;
1481 
1482 
1504  virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
1505  int timeout_ms) = 0;
1506 
1507 
1516  virtual Queue *get_partition_queue (const TopicPartition *partition) = 0;
1517 
1534  virtual ErrorCode set_log_queue (Queue *queue) = 0;
1535 
1547  virtual void yield () = 0;
1548 
1563  virtual const std::string clusterid (int timeout_ms) = 0;
1564 
1581  virtual struct rd_kafka_s *c_ptr () = 0;
1582 
1598  virtual int32_t controllerid (int timeout_ms) = 0;
1599 
1600 
1622  virtual ErrorCode fatal_error (std::string &errstr) const = 0;
1623 
1663  virtual ErrorCode oauthbearer_set_token (const std::string &token_value,
1664  int64_t md_lifetime_ms,
1665  const std::string &md_principal_name,
1666  const std::list<std::string> &extensions,
1667  std::string &errstr) = 0;
1668 
1686  virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr) = 0;
1687 };
1688 
1689 
1708 class RD_EXPORT TopicPartition {
1709 public:
1715  static TopicPartition *create (const std::string &topic, int partition);
1716 
1723  static TopicPartition *create (const std::string &topic, int partition,
1724  int64_t offset);
1725 
1726  virtual ~TopicPartition() = 0;
1727 
1732  static void destroy (std::vector<TopicPartition*> &partitions);
1733 
1735  virtual const std::string &topic () const = 0;
1736 
1738  virtual int partition () const = 0;
1739 
1741  virtual int64_t offset () const = 0;
1742 
1744  virtual void set_offset (int64_t offset) = 0;
1745 
1747  virtual ErrorCode err () const = 0;
1748 };
1749 
1750 
1751 
1756 class RD_EXPORT Topic {
1757  public:
1764  static const int32_t PARTITION_UA;
1765 
1767  static const int64_t OFFSET_BEGINNING;
1768  static const int64_t OFFSET_END;
1769  static const int64_t OFFSET_STORED;
1770  static const int64_t OFFSET_INVALID;
1782  static Topic *create (Handle *base, const std::string &topic_str,
1783  const Conf *conf, std::string &errstr);
1784 
1785  virtual ~Topic () = 0;
1786 
1787 
1789  virtual const std::string name () const = 0;
1790 
1796  virtual bool partition_available (int32_t partition) const = 0;
1797 
1809  virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
1810 
1827  virtual struct rd_kafka_topic_s *c_ptr () = 0;
1828 };
1829 
1830 
1852 class RD_EXPORT MessageTimestamp {
1853 public:
1858  MSG_TIMESTAMP_LOG_APPEND_TIME
1859  };
1860 
1862  int64_t timestamp;
1863 };
1864 
1865 
1875 class RD_EXPORT Headers {
1876 public:
1877  virtual ~Headers() = 0;
1878 
1887  class Header {
1888  public:
1899  Header(const std::string &key,
1900  const void *value,
1901  size_t value_size):
1902  key_(key), err_(ERR_NO_ERROR), value_size_(value_size) {
1903  value_ = copy_value(value, value_size);
1904  }
1905 
1919  Header(const std::string &key,
1920  const void *value,
1921  size_t value_size,
1922  const RdKafka::ErrorCode err):
1923  key_(key), err_(err), value_(NULL), value_size_(value_size) {
1924  if (err == ERR_NO_ERROR)
1925  value_ = copy_value(value, value_size);
1926  }
1927 
1933  Header(const Header &other):
1934  key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
1935  value_ = copy_value(other.value_, value_size_);
1936  }
1937 
1943  Header& operator=(const Header &other)
1944  {
1945  if (&other == this) {
1946  return *this;
1947  }
1948 
1949  key_ = other.key_;
1950  err_ = other.err_;
1951  value_size_ = other.value_size_;
1952 
1953  if (value_ != NULL)
1954  free(value_);
1955 
1956  value_ = copy_value(other.value_, value_size_);
1957 
1958  return *this;
1959  }
1960 
1961  ~Header() {
1962  if (value_ != NULL)
1963  free(value_);
1964  }
1965 
1967  std::string key() const {
1968  return key_;
1969  }
1970 
1972  const void *value() const {
1973  return value_;
1974  }
1975 
1978  const char *value_string() const {
1979  return static_cast<const char *>(value_);
1980  }
1981 
1983  size_t value_size() const {
1984  return value_size_;
1985  }
1986 
1988  RdKafka::ErrorCode err() const {
1989  return err_;
1990  }
1991 
1992  private:
1993  char *copy_value(const void *value, size_t value_size) {
1994  if (!value)
1995  return NULL;
1996 
1997  char *dest = (char *)malloc(value_size + 1);
1998  memcpy(dest, (const char *)value, value_size);
1999  dest[value_size] = '\0';
2000 
2001  return dest;
2002  }
2003 
2004  std::string key_;
2005  RdKafka::ErrorCode err_;
2006  char *value_;
2007  size_t value_size_;
2008  void *operator new(size_t); /* Prevent dynamic allocation */
2009  };
2010 
2016  static Headers *create();
2017 
2026  static Headers *create(const std::vector<Header> &headers);
2027 
2037  virtual ErrorCode add(const std::string &key, const void *value,
2038  size_t value_size) = 0;
2039 
2050  virtual ErrorCode add(const std::string &key, const std::string &value) = 0;
2051 
2061  virtual ErrorCode add(const Header &header) = 0;
2062 
2070  virtual ErrorCode remove(const std::string &key) = 0;
2071 
2081  virtual std::vector<Header> get(const std::string &key) const = 0;
2082 
2093  virtual Header get_last(const std::string &key) const = 0;
2094 
2100  virtual std::vector<Header> get_all() const = 0;
2101 
2105  virtual size_t size() const = 0;
2106 };
2107 
2108 
2120 class RD_EXPORT Message {
2121  public:
2124  enum Status {
2128  MSG_STATUS_NOT_PERSISTED = 0,
2129 
2133  MSG_STATUS_POSSIBLY_PERSISTED = 1,
2134 
2138  MSG_STATUS_PERSISTED = 2,
2139  };
2140 
2148  virtual std::string errstr() const = 0;
2149 
2151  virtual ErrorCode err () const = 0;
2152 
2157  virtual Topic *topic () const = 0;
2158 
2160  virtual std::string topic_name () const = 0;
2161 
2163  virtual int32_t partition () const = 0;
2164 
2166  virtual void *payload () const = 0 ;
2167 
2169  virtual size_t len () const = 0;
2170 
2172  virtual const std::string *key () const = 0;
2173 
2175  virtual const void *key_pointer () const = 0 ;
2176 
2178  virtual size_t key_len () const = 0;
2179 
2181  virtual int64_t offset () const = 0;
2182 
2184  virtual MessageTimestamp timestamp () const = 0;
2185 
2187  virtual void *msg_opaque () const = 0;
2188 
2189  virtual ~Message () = 0;
2190 
2193  virtual int64_t latency () const = 0;
2194 
2211  virtual struct rd_kafka_message_s *c_ptr () = 0;
2212 
2216  virtual Status status () const = 0;
2217 
2222  virtual RdKafka::Headers *headers () = 0;
2223 
2230  virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0;
2231 
2234  virtual int32_t broker_id () const = 0;
2235 };
2236 
2260 class RD_EXPORT Queue {
2261  public:
2265  static Queue *create (Handle *handle);
2266 
2277  virtual ErrorCode forward (Queue *dst) = 0;
2278 
2279 
2291  virtual Message *consume (int timeout_ms) = 0;
2292 
2300  virtual int poll (int timeout_ms) = 0;
2301 
2302  virtual ~Queue () = 0;
2303 
2319  virtual void io_event_enable (int fd, const void *payload, size_t size) = 0;
2320 };
2321 
2335 class RD_EXPORT ConsumerGroupMetadata {
2336 public:
2337  virtual ~ConsumerGroupMetadata () = 0;
2338 };
2339 
2357 class RD_EXPORT KafkaConsumer : public virtual Handle {
2358 public:
2370  static KafkaConsumer *create (const Conf *conf, std::string &errstr);
2371 
2372  virtual ~KafkaConsumer () = 0;
2373 
2374 
2377  virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
2378 
2381  virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
2382 
2417  virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0;
2418 
2420  virtual ErrorCode unsubscribe () = 0;
2421 
2428  virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0;
2429 
2433  virtual ErrorCode unassign () = 0;
2434 
2459  virtual Message *consume (int timeout_ms) = 0;
2460 
2474  virtual ErrorCode commitSync () = 0;
2475 
2481  virtual ErrorCode commitAsync () = 0;
2482 
2492  virtual ErrorCode commitSync (Message *message) = 0;
2493 
2503  virtual ErrorCode commitAsync (Message *message) = 0;
2504 
2514  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
2515 
2525  virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
2526 
2537  virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0;
2538 
2549  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
2550  OffsetCommitCb *offset_commit_cb) = 0;
2551 
2552 
2553 
2554 
2563  virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
2564  int timeout_ms) = 0;
2565 
2574  virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
2575 
2576 
2599  virtual ErrorCode close () = 0;
2600 
2601 
2619  virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0;
2620 
2621 
2639  virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
2640 
2641 
2652  virtual ConsumerGroupMetadata *groupMetadata () = 0;
2653 
2654 
2655 };
2656 
2657 
2672 class RD_EXPORT Consumer : public virtual Handle {
2673  public:
2684  static Consumer *create (const Conf *conf, std::string &errstr);
2685 
2686  virtual ~Consumer () = 0;
2687 
2688 
2708  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
2709 
2716  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
2717  Queue *queue) = 0;
2718 
2728  virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
2729 
2744  virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
2745  int timeout_ms) = 0;
2746 
2764  virtual Message *consume (Topic *topic, int32_t partition,
2765  int timeout_ms) = 0;
2766 
2788  virtual Message *consume (Queue *queue, int timeout_ms) = 0;
2789 
2809  virtual int consume_callback (Topic *topic, int32_t partition,
2810  int timeout_ms,
2811  ConsumeCb *consume_cb,
2812  void *opaque) = 0;
2813 
2820  virtual int consume_callback (Queue *queue, int timeout_ms,
2821  RdKafka::ConsumeCb *consume_cb,
2822  void *opaque) = 0;
2823 
2833  static int64_t OffsetTail(int64_t offset);
2834 };
2835 
2849 class RD_EXPORT Producer : public virtual Handle {
2850  public:
2861  static Producer *create (const Conf *conf, std::string &errstr);
2862 
2863 
2864  virtual ~Producer () = 0;
2865 
2871  enum {
2872  RK_MSG_FREE = 0x1,
2875  RK_MSG_COPY = 0x2,
2880  RK_MSG_BLOCK = 0x4
2897  /* For backwards compatibility: */
2898 #ifndef MSG_COPY /* defined in sys/msg.h */
2899  ,
2902  MSG_FREE = RK_MSG_FREE,
2903  MSG_COPY = RK_MSG_COPY
2904 #endif
2905 
2906  };
2907 
2964  virtual ErrorCode produce (Topic *topic, int32_t partition,
2965  int msgflags,
2966  void *payload, size_t len,
2967  const std::string *key,
2968  void *msg_opaque) = 0;
2969 
2974  virtual ErrorCode produce (Topic *topic, int32_t partition,
2975  int msgflags,
2976  void *payload, size_t len,
2977  const void *key, size_t key_len,
2978  void *msg_opaque) = 0;
2979 
2986  virtual ErrorCode produce (const std::string topic_name, int32_t partition,
2987  int msgflags,
2988  void *payload, size_t len,
2989  const void *key, size_t key_len,
2990  int64_t timestamp, void *msg_opaque) = 0;
2991 
2999  virtual ErrorCode produce (const std::string topic_name, int32_t partition,
3000  int msgflags,
3001  void *payload, size_t len,
3002  const void *key, size_t key_len,
3003  int64_t timestamp,
3004  RdKafka::Headers *headers,
3005  void *msg_opaque) = 0;
3006 
3007 
3012  virtual ErrorCode produce (Topic *topic, int32_t partition,
3013  const std::vector<char> *payload,
3014  const std::vector<char> *key,
3015  void *msg_opaque) = 0;
3016 
3017 
3030  virtual ErrorCode flush (int timeout_ms) = 0;
3031 
3032 
3060  virtual ErrorCode purge (int purge_flags) = 0;
3061 
3065  enum {
3066  PURGE_QUEUE = 0x1,
3068  PURGE_INFLIGHT = 0x2,
3075  PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue
3076  * purging to finish. */
3077  };
3078 
3105  virtual Error *init_transactions (int timeout_ms) = 0;
3106 
3107 
3120  virtual Error *begin_transaction () = 0;
3121 
3168  virtual Error *send_offsets_to_transaction (
3169  const std::vector<TopicPartition*> &offsets,
3170  const ConsumerGroupMetadata *group_metadata,
3171  int timeout_ms) = 0;
3172 
3194  virtual Error *commit_transaction (int timeout_ms) = 0;
3195 
3219  virtual Error *abort_transaction (int timeout_ms) = 0;
3220 
3222 };
3223 
3238  public:
3240  virtual int32_t id() const = 0;
3241 
3243  virtual const std::string host() const = 0;
3244 
3246  virtual int port() const = 0;
3247 
3248  virtual ~BrokerMetadata() = 0;
3249 };
3250 
3251 
3252 
3257  public:
3259  typedef std::vector<int32_t> ReplicasVector;
3261  typedef std::vector<int32_t> ISRSVector;
3262 
3264  typedef ReplicasVector::const_iterator ReplicasIterator;
3266  typedef ISRSVector::const_iterator ISRSIterator;
3267 
3268 
3270  virtual int32_t id() const = 0;
3271 
3273  virtual ErrorCode err() const = 0;
3274 
3276  virtual int32_t leader() const = 0;
3277 
3279  virtual const std::vector<int32_t> *replicas() const = 0;
3280 
3284  virtual const std::vector<int32_t> *isrs() const = 0;
3285 
3286  virtual ~PartitionMetadata() = 0;
3287 };
3288 
3289 
3290 
3295  public:
3297  typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
3299  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
3300 
3302  virtual const std::string topic() const = 0;
3303 
3305  virtual const PartitionMetadataVector *partitions() const = 0;
3306 
3308  virtual ErrorCode err() const = 0;
3309 
3310  virtual ~TopicMetadata() = 0;
3311 };
3312 
3313 
3317 class Metadata {
3318  public:
3320  typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
3322  typedef std::vector<const TopicMetadata*> TopicMetadataVector;
3323 
3325  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
3327  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
3328 
3329 
3335  virtual const BrokerMetadataVector *brokers() const = 0;
3336 
3342  virtual const TopicMetadataVector *topics() const = 0;
3343 
3345  virtual int32_t orig_broker_id() const = 0;
3346 
3348  virtual const std::string orig_broker_name() const = 0;
3349 
3350  virtual ~Metadata() = 0;
3351 };
3352 
3355 }
3356 
3357 #endif /* _RDKAFKACPP_H_ */
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *...
virtual ErrorCode err() const =0
std::string key() const
Definition: rdkafkacpp.h:1967
virtual const std::string orig_broker_name() const =0
Broker (name) originating this metadata.
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:3266
Header(const std::string &key, const void *value, size_t value_size, const RdKafka::ErrorCode err)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:1919
Header object.
Definition: rdkafkacpp.h:1887
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:2357
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:1767
ConfType
Configuration object type.
Definition: rdkafkacpp.h:1099
Partitioner callback class.
Definition: rdkafkacpp.h:698
virtual int port() const =0
Type
Event type.
Definition: rdkafkacpp.h:776
const char * value_string() const
Definition: rdkafkacpp.h:1978
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
size_t value_size() const
Definition: rdkafkacpp.h:1983
virtual const std::string host() const =0
SASL/OAUTHBEARER token refresh callback class.
Definition: rdkafkacpp.h:680
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:3297
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:1764
Header(const Header &other)
Copy constructor.
Definition: rdkafkacpp.h:1933
Message object.
Definition: rdkafkacpp.h:2120
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:784
Event callback class.
Definition: rdkafkacpp.h:757
Headers object.
Definition: rdkafkacpp.h:1875
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:3264
Status
Message persistence status can be used by the application to find out if a produced message was persi...
Definition: rdkafkacpp.h:2124
KafkaConsumer: Rebalance callback class
Definition: rdkafkacpp.h:885
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:3259
virtual const std::string topic() const =0
int64_t timestamp
Definition: rdkafkacpp.h:1862
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:728
virtual int32_t id() const =0
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:3325
ConsumerGroupMetadata holds a consumer instance's group metadata state.
Definition: rdkafkacpp.h:2335
Definition: rdkafkacpp.h:779
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:1770
virtual const PartitionMetadataVector * partitions() const =0
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
RdKafka::ErrorCode err() const
Definition: rdkafkacpp.h:1988
Topic handle.
Definition: rdkafkacpp.h:1756
Metadata: Partition information.
Definition: rdkafkacpp.h:3256
Producer.
Definition: rdkafkacpp.h:2849
Metadata: Topic information.
Definition: rdkafkacpp.h:3294
Definition: rdkafkacpp.h:1100
Definition: rdkafkacpp.h:777
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:3320
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:1107
Definition: rdkafkacpp.h:93
Queue interface.
Definition: rdkafkacpp.h:2260
MessageTimestampType type
Definition: rdkafkacpp.h:1861
Message timestamp object.
Definition: rdkafkacpp.h:1852
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:3261
virtual const TopicMetadataVector * topics() const =0
Topic list.
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:1768
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:3299
Portability: OpenCb callback class
Definition: rdkafkacpp.h:1055
SSL broker certificate verification class.
Definition: rdkafkacpp.h:977
Metadata: Broker information.
Definition: rdkafkacpp.h:3237
Definition: rdkafkacpp.h:778
virtual int32_t leader() const =0
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:1769
Header(const std::string &key, const void *value, size_t value_size)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:1899
Configuration interface.
Definition: rdkafkacpp.h:1094
Offset Commit callback class.
Definition: rdkafkacpp.h:947
virtual const std::vector< int32_t > * replicas() const =0
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:1355
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:3322
Topic+Partition.
Definition: rdkafkacpp.h:1708
The Error class is used as a return value from APIs to propagate an error. The error consists of an e...
Definition: rdkafkacpp.h:557
Consume callback class.
Definition: rdkafkacpp.h:867
virtual const std::vector< int32_t > * isrs() const =0
Simple Consumer (legacy)
Definition: rdkafkacpp.h:2672
MessageTimestampType
Definition: rdkafkacpp.h:1855
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:3327
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:773
Metadata container.
Definition: rdkafkacpp.h:3317
virtual ErrorCode err() const =0
Header & operator=(const Header &other)
Assignment operator.
Definition: rdkafkacpp.h:1943
Portability: SocketCb callback class
Definition: rdkafkacpp.h:1030
Delivery Report callback class.
Definition: rdkafkacpp.h:642
const void * value() const
Definition: rdkafkacpp.h:1972
virtual int32_t id() const =0