DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Enterprise AI Trend Report: Gain insights on ethical AI, MLOps, generative AI, large language models, and much more.

2024 Cloud survey: Share your insights on microservices, containers, K8s, CI/CD, and DevOps (+ enter a $750 raffle!) for our Trend Reports.

PostgreSQL: Learn about the open-source RDBMS' advanced capabilities, core components, common commands and functions, and general DBA tasks.

AI Automation Essentials. Check out the latest Refcard on all things AI automation, including model training, data security, and more.

Related

  • Kafka Stream (KStream) vs Apache Flink
  • Consuming Kafka Messages From Apache Flink
  • Automatic Snapshots Using Snapshot Manager
  • Securing and Monitoring Your Data Pipeline: Best Practices for Kafka, AWS RDS, Lambda, and API Gateway Integration

Trending

  • Getting Started With NCache Java Edition (Using Docker)
  • Data Processing in GCP With Apache Airflow and BigQuery
  • Being a Backend Developer Today Feels Harder Than 20 Years Ago
  • Modern Digital Authentication Protocols
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Apache Flink With Kafka - Consumer and Producer

Apache Flink With Kafka - Consumer and Producer

By 
Preetdeep Kumar user avatar
Preetdeep Kumar
DZone Core CORE ·
Apr. 02, 20 · Tutorial
Like (6)
Save
Tweet
Share
24.4K Views

Join the DZone community and get the full member experience.

Join For Free

Overview

Apache Flink provides various connectors to integrate with other systems. In this article, I will share an example of consuming records from Kafka through FlinkKafkaConsumer and producing records to Kafka using FlinkKafkaProducer.

Setup

I installed Kafka locally and created two Topics, TOPIC-IN and TOPIC-OUT. 

Shell
xxxxxxxxxx
1
 
1
# Create two topics
2
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TOPIC-IN
3
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TOPIC-OUT
4
5
# List all topics
6
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
7
TOPIC-IN
8
TOPIC-OUT
9
__consumer_offsets


I wrote a very simple NumberGenerator, which will generate a number every second and send it to TOPIC_IN using a KafkaProducer object. The code for both is available on Github.

A sample run produces the following output:

Shell
xxxxxxxxxx
1
10
 
1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC-IN --property print.key=true --from-beginning
2
myKey   [1]
3
myKey   [2]
4
myKey   [3]
5
myKey   [4]
6
myKey   [5]
7
myKey   [6]
8
myKey   [7]
9
myKey   [8]
10
myKey   [9]


FlinkKafkaConnector Example

First, define a FlinkKafkaConsumer, as shown below:

Java
xxxxxxxxxx
1
27
 
1
String TOPIC_IN = "TOPIC-IN";
2
String TOPIC_OUT = "TOPIC-OUT";
3
String BOOTSTRAP_SERVER = "localhost:9092";
4
5
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6
7
// to use allowed lateness and timestamp from kafka message
8
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
9
        
10
Properties props = new Properties();
11
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
12
props.put("client.id", "flink-kafka-example");
13
14
// consumer to get both key/values per Topic
15
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
16
17
// for allowing Flink to handle late elements
18
kafkaConsumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<KafkaRecord>() 
19
    {
20
        @Override
21
        public long extractAscendingTimestamp(KafkaRecord record) 
22
        {
23
            return record.timestamp;
24
        }
25
    });
26
27
kafkaConsumer.setStartFromLatest();


Line #5: Get a local Flink StreamExecutionEnvrionment.

Line #8: Required to use timestamp coming in the messages from Kafka. Otherwise, Flink will use the system clock.

Line #15: Create a FlinkKafkaConsumer<> object, which will act as a source for us. The class "KafkaRecord" is a wrapper for the key and value coming from Kafka, and the MySchema class implements KafkaDeserializationSchema<KafkaRecord> to provide deserialization logic used by Flink to convert byte[] from Kafka to String. 

The code for both is available here. This is required because I want to read both the key and value of the Kafka messages.

Line #18 to #25: Required to inform Flink where it should read the timestamp. This is used to decide the start and end of a TumblingTimewindow.

After this, we need to define a FlinkKafkaProducer, as shown below:

Java
xxxxxxxxxx
1
 
1
Properties prodProps = new Properties();
2
prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER);
3
        
4
FlinkKafkaProducer<KafkaRecord> kafkaProducer = 
5
                new FlinkKafkaProducer<KafkaRecord>(TOPIC_OUT, 
6
                                               ((record, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT, record.key.getBytes(), record.value.getBytes())), 
7
                                               prodProps, 
8
                                               Semantic.EXACTLY_ONCE);


Now, we can define a simple pipeline, as shown below:

Java
xxxxxxxxxx
1
26
 
1
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
2
3
stream        
4
.filter((record) -> record.value != null && !record.value.isEmpty())
5
.keyBy(record -> record.key)
6
.timeWindow(Time.seconds(5))
7
.allowedLateness(Time.seconds(1))        
8
.reduce(new ReduceFunction<KafkaRecord>() 
9
{
10
  KafkaRecord result = new KafkaRecord();
11
  
12
  @Override
13
  public KafkaRecord reduce(KafkaRecord record1, KafkaRecord record2) throws Exception 
14
  {
15
     result.key = "outKey";
16
     result.value = record1.value+record2.value;
17
     return result;
18
  }
19
})
20
.addSink(kafkaProducer);
21
22
// produce a number as string every second
23
new NumberGenerator(p, TOPIC_IN).start();
24
        
25
// start flink
26
env.execute();


Line #1: Create a DataStream from the FlinkKafkaConsumer object as the source.

Line #3: Filter out null and empty values coming from Kafka.

Line #5: Key the Flink stream based on the key present in Kafka messages. This will logically partition the stream and allow parallel execution on a per-key basis.

Line #6 to #7: Define a time window of five seconds and provide lateness of an extra second.

Line #8 to #19: Simple reduction logic that appends all the numbers collected in a window and sends the result using a new key "outKey".

Line #20: Sends the output of each window to the FlinkKafkaProducer object created above.

Line #23: Start the NumberGenerator.

Line #26: Start the Flink execution environment.

A sample run of this code produces the following output:

Shell
xxxxxxxxxx
1
 
1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC-OUT --property print.key=true --from-beginning
2
outKey  [5][6]
3
outKey  [7][8][9][10][11]
4
outKey  [12][13][14][15][16]
5
outKey  [17][18][19][20][21]
6
outKey  [22][23][24][25][26]


Conclusion

The above example shows how to use Flink's Kafka connector API to consume as well as produce messages to Kafka and customized deserialization when reading data from Kafka.

kafka Apache Flink

Opinions expressed by DZone contributors are their own.

Related

  • Kafka Stream (KStream) vs Apache Flink
  • Consuming Kafka Messages From Apache Flink
  • Automatic Snapshots Using Snapshot Manager
  • Securing and Monitoring Your Data Pipeline: Best Practices for Kafka, AWS RDS, Lambda, and API Gateway Integration

Partner Resources


Comments

ABOUT US

  • About DZone
  • Send feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: