Enterprise AI Trend Report: Gain insights on ethical AI, MLOps, generative AI, large language models, and much more.
2024 Cloud survey: Share your insights on microservices, containers, K8s, CI/CD, and DevOps (+ enter a $750 raffle!) for our Trend Reports.
Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
What You Possibly Don’t Know About Columnar Storage
The Evolution of Database Architectures: Navigating Big Data, Cloud, and AI Integration
As the world takes a multi-layered approach to data storage, there is a shift in how organizations transform data. It has driven businesses to integrate extract, load, and transform (ELT) tools with Medallion architecture. This trend reshapes how data is ingested and transformed across lines of business as well as by departmental users, data analysts, and C-level executives. Applying rigid data transformation rules and making data available for your teams through a data warehouse may not fully address your business's evolving and exploratory data integration needs. Depending on the volume of data your organization produces and the rate at which it's generated, processing data without knowing the consumption patterns could prove to be costly. Case-based data transformation could be more economically viable as more ad-hoc queries and analyses pop up every day. That doesn't mean you store the data in raw form. Instead, it's necessary to add several layers of transformations, enrichments, and business rules to optimize cost and performance. How Business Requirements Shape Database Technologies Let's take a quick look at how data management has evolved. We started with cloud data warehouses. Traditional data warehouses, such as those based on relational database systems, have been the backbone of enterprise data management for years. They're optimized for structured data and typically used for business intelligence and reporting. Then, we moved into the era of data lakes. Data lakes became popular for handling large volumes of structured and unstructured data. They offer flexibility in data storage and processing, allowing organizations to store raw and diverse data in its native format. Now, we have data lakehouses. The concept of a data lakehouse emerged as a response to some of the challenges associated with data lakes, such as data quality, data governance, and the need for transactional capabilities. The data lakehouse architecture aims to combine the best features of data lakes and data warehouses, combining the scalability and flexibility of data lakes with the reliability and performance of data warehouses. Technologies like Delta Lake and Apache Iceberg have contributed to developing the data lakehouse concept by adding transactional capabilities and schema enforcement to data lakes. To fully leverage the potential of this evolving architecture, we recommend implementing best practices, one of which is medallion architecture. What Is Medallion Architecture? Medallion architecture is gaining popularity in the data world. Unlike traditional data lake architectures, where raw or unstructured data is stored without any schema enforcement or strong consistency guarantees, medallion architecture introduces structure and organization to your data. It allows you to add schema evolution capabilities to your datasets stored in Delta Lake, making it easier to query and analyze your data effectively. One of the reasons why medallion architecture is gaining popularity is its ability to handle large volumes of diverse data types in a scalable manner. By leveraging Delta Lake's transactional capabilities, you can ensure Atomic, Consistent, Independent, and Durable (ACID) compliance for your operations on massive datasets. But how does it differ from traditional data lake architectures? While both approaches store raw or unstructured data, medallion architecture introduces a systematic method of defining bronze, silver, and gold layers within a data lake. This allows data engineers to curate the right data for the right audience. It also makes it easier for users to query and analyze their datasets without sacrificing performance or reliability. This shows an SQL ELT (native) reference architecture. This is why medallion architecture is taking off in the realm of Delta Lake and cloud data warehousing. It offers a powerful combination of scalability, reliability, performance, and structured storage for your valuable datasets. Now, let's explore how data processing needs to change along with changes in architecture. Why Is ELT the Right Data Transformation Process for Medallion Architecture? As defined, there are several layers in Medallion data architecture. Data is progressively processed and refined as it moves through these layers. Using traditional extract, transform, load (ETL) can be inefficient, as it often requires moving data out of your data warehouse or lakehouse for every transformation, which is needed for the next processing level. Instead, a more effective approach is to use pushdown technology, where you push the code into the target/source, allowing data processing to occur where it resides. In this case, only the data transformation code moves, not the data itself. ELT further streamlines this process by enabling you to transform the data as many times as you want, making your system more efficient. With ELT, you reduce the burden on the source system, as the data is ingested only once into the data lake/lakehouse. The optimal design of ELT provides several competitive advantages. It enables you to process large volumes of data more rapidly, accelerating insights and decision-making. It also reduces operational costs by minimizing unnecessary data movement across networks and systems. Necessary Data Integration Capabilities to Run ELT in Medallion Data Architecture A few specific data integration capabilities will enable you to run ELT successfully in Medallion data architecture. These include: Parallel processing at scale: This is a must-have technology that runs your ELT code on multiple machines at the same time, which can improve the performance of your data jobs. A processing engine like Spark can handle massive datasets by scaling out to larger clusters and adding more nodes. The scheduler distributes tasks to worker nodes, balancing workload and maximizing resource utilization. Data loading patterns: Make sure the tool doesn't solely rely on batch load but also supports real-time streaming and full and incremental loads. Change data capture (CDC) and schema drift are the most frequently used features when transferring data from the sources to a data lakehouse. Optimized data processing at each stage: Medallion architecture is a system for logically organizing data within a data lakehouse. Each layer in a Medallion architecture serves a different purpose, and transformations are applied while considering the security boundaries, retention rules, user access policies, required latency, and business impact level. You should be able to process data at a granular level, optimizing it for the next step of logical data processing. Preview code during design time: This capability allows you to see the results of your ELT code before you run it, which can help you catch errors and ensure your code is doing what you want it to do. Multi-cloud support: Don't limit your integration capabilities to one particular ecosystem. Ensure you can run your data pipeline jobs in multiple cloud environments, such as Snowflake, Databricks, Amazon Web Services (AWS), Microsoft Azure, and Google Cloud. Auto tuning: This lets your ELT tool automatically adjust the settings of your jobs to improve their performance. The tool should be AI-enabled to collect runtime statistics and adjust execution strategies based on data characteristics. Flexible transformation: ELT tools must allow flexible transformation logic, as transformations can be performed using a wider range of tools and techniques, including SQL, Python, and Spark. This can be useful if you need to perform complex transformations not supported by SQL. Combine SQL code with proprietary code: This enables you to use both SQL code and proprietary code in a single ELT pipeline. This can be useful if you need to perform tasks not supported by SQL. For example, you might use SQL to query the database and retrieve the data, then write a Python function to implement a data quality check, applying custom logic to identify and address any data issues. End-to-end workflow: This capability provides a visual interface that allows you to design and execute your ELT jobs as part of a complete task flow. The tool should enable the scheduling and orchestration of a set of tasks, starting from extracting data to triggering downstream tasks, managing dependencies, and enabling data observability. Security, access control, and masking capabilities: This allows you to control who has access to your data and to mask sensitive data. This is important for protecting your data from unauthorized access. The ability to implement DataOps: This gives you the ability to integrate your ETL processes with your DevOps processes. This can help you to improve the quality and reliability of your data. Easy switching between ETL and ELT: This makes it easy for you to switch between ETL and ELT processing. This can be useful if you need to change your data processing strategy. Data transformation as a code: This makes it possible for you to store your ETL code in a repository, making it easier to manage and version your code. Advanced transformation: When ELT becomes your mainstream way of processing data, you need to ensure you don't have to run to different tools for complex transformations. Data quality: This gives you the ability to identify and address data quality issues early in your ELT process. This can help you to improve the quality of your data. Integration with data lineage and governance: This capability allows you to track the origins and transformations of your data. This can help you ensure your data complies with your data governance policies. The ELT tool should integrate seamlessly with your data lineage and governance frameworks to maintain data traceability, consistency, and security. It should provide visibility into data origins, transformations, and destinations, enabling effective data auditing and compliance with data governance policies. Next Steps It's crucial for your business to select an ELT tool that's high-performing and also compatible with Medallion data architecture. This will enhance data integration capabilities, allowing you to utilize the structured, layered approach of Medallion architecture fully. This alignment will give your business a competitive edge by efficiently handling large data volumes, improving scalability, streamlining workflow processes, and achieving cost efficiencies.
When data is analyzed and processed in real time, it can yield insights and actionable information either instantly or with very little delay from the time the data is collected. The capacity to collect, handle, and retain user-generated data in real time is crucial for many applications in today’s data-driven environment. There are various ways to emphasize the significance of real-time data analytics like timely decision-making, IoT and sensor data processing, enhanced customer experience, proactive problem resolution, fraud detection and security, etc. Rising to the demands of diverse real-time data processing scenarios, Apache Kafka has established itself as a dependable and scalable event streaming platform. In short, the process of collecting data in real-time as streams of events from event sources such as databases, sensors, and software applications is known as event streaming. With real-time data processing and analytics in mind, Apache Flink is a potent open-source program. For situations where quick insights and minimal processing latency are critical, it offers a consistent and effective platform for managing continuous streams of data. Causes for the Improved Collaboration Between Apache Flink and Kafka Apache Flink joined the Apache Incubator in 2014, and since its inception, Apache Kafka has consistently stood out as one of the most frequently utilized connectors for Apache Flink. It is just a data processing engine that can be clubbed with the processing logic but does not provide any storage mechanism. Since Kafka provides the foundational layer for storing streaming data, Flink can serve as a computational layer for Kafka, powering real-time applications and pipelines. Apache Flink has produced first-rate support for creating Kafka-based apps throughout the years. By utilizing the numerous services and resources offered by the Kafka ecosystem, Flink applications are able to leverage Kafka as both a source and a sink. Avro, JSON, and Protobuf are just a few widely used formats that Flink natively supports. Apache Kafka proved to be an especially suitable match for Apache Flink. Unlike alternative systems such as ActiveMQ, RabbitMQ, etc., Kafka offers the capability to durably store data streams indefinitely, enabling consumers to read streams in parallel and replay them as necessary. This aligns with Flink’s distributed processing model and fulfills a crucial requirement for Flink’s fault tolerance mechanism. Kafka can be used by Flink applications as a source as well as a sink by utilizing the many tools and services available in the Kafka ecosystem. Flink offers native support for commonly used formats like Avro, JSON, and Protobuf, similar to Kafka’s support for these formats. Other external systems can be linked to Flink’s Table API and SQL programs to read and write batch and streaming tables. Access to data kept in external systems such as a file system, database, message queue, or key-value store is made possible by a table source. For Kafka, it’s nothing but a key-value pair. Events are added to the Flink table in a similar manner as they are appended to the Kafka topic. A topic in a Kafka cluster is mapped to a table in Flink. In Flink, each table is equal to a stream of events that describe the modifications being made to that particular table. The table is automatically updated when a query refers to it, and its results are either materialized or emitted. Conclusion In conclusion, we can create reliable, scalable, low-latency real-time data processing pipelines with fault tolerance and exactly-once processing guarantees by combining Apache Flink and Apache Kafka. For businesses wishing to instantly evaluate and gain insights from streaming data, this combination provides a potent option. Thank you for reading this write-up. If you found this content valuable, please consider liking and sharing.
Imagine the challenge of rapidly aggregating and processing large volumes of data from multiple point-of-sale (POS) systems for real-time analysis. In such scenarios, where speed is critical, the combination of Kafka and ClickHouse emerges as a formidable solution. Kafka excels in handling high-throughput data streams, while ClickHouse distinguishes itself with its lightning-fast data processing capabilities. Together, they form a powerful duo, enabling the construction of top-level analytical dashboards that provide timely and comprehensive insights. This article explores how Kafka and ClickHouse can be integrated to transform vast data streams into valuable, real-time analytics. This diagram depicts the initial, straightforward approach: data flows directly from POS systems to ClickHouse for storage and analysis. While seemingly effective, this somewhat naive solution may not scale well or handle the complexities of real-time processing demands, setting the stage for a more robust solution involving Kafka. Understanding Challenges With Data Insertion in ClickHouse The simple approach may lead you to a common pitfall or first “deadly sin” when starting with ClickHouse (for more details, see Common Getting Started Issues with ClickHouse). You'll likely encounter this error during data insertion, visible in ClickHouse logs, or as a response to an INSERT request. Grasping this issue requires knowledge of ClickHouse's architecture, specifically the concept of a “part.” Ingesting data into ClickHouse is most effective when managed with precision, leveraging both speed and parallelism. The optimal process, as illustrated, involves batched insertions coordinated by a central system rather than individual, uncontrolled streams of data: In the optimal setup, data is inserted by a primary controller that manages the flow, adjusting speed dynamically while maintaining controlled parallelism. This method ensures efficient data processing and is in line with ClickHouse's optimal performance conditions. That's why, in practice, it's common to introduce a buffer before ClickHouse: Kafka now enters the architecture as the preferred solution for data buffering. It effortlessly bridges the gap between the data producers and ClickHouse, offering a robust intermediary that enhances data handling. Here's how the revised architecture integrates Kafka: The integration of Kafka requires additional coding to funnel data from POS systems and then to ClickHouse. This element of the architecture, while powerful and scalable, introduces complexity that we'll explore in more detail later in the article. Data Transfer From Kafka to ClickHouse The critical stages in delivering data from Kafka to ClickHouse involve reading Kafka topics, transforming data into ClickHouse-compatible formats, and writing this formatted data into ClickHouse tables. The trade-off here lies in deciding where to perform each stage. Each stage has its own resource demands: Reading stage: This initial phase consumes CPU and network bandwidth to pull in data from Kafka topics. Transformation process: Transforming the data demands CPU and memory usage. It's a straightforward resource-utilization phase, where computational power reshapes the data to fit ClickHouse's specifications. Writing stage: The final act involves writing data into ClickHouse tables, which also requires CPU power and network bandwidth. It's a routine process, ensuring the data finds its place in ClickHouse's storage with allocated resources. When integrating, it's essential to balance these resource uses. Now, let's examine the various methodologies for linking Kafka with ClickHouse. ClickHouse’s Kafka Engine Leverage the Kafka engine within ClickHouse to directly ingest data into your tables. The high-level process is visually represented in the accompanying diagram: Considering this scenario, the POS terminals are designed to output data in a structured JSON format, with each entry separated by a new line. This format is typically well-suited for log ingestion and processing systems. Shell {"user_ts": "SOME_DATE", "id": 123, "message": "SOME_TEXT"} {"user_ts": "SOME_DATE", "id": 1234, "message": "SOME_TEXT"} To set up the Kafka Engine in ClickHouse, we begin by creating a topic wrapper within ClickHouse using the Kafka Engine. This is outlined in the provided example file: example kafka_stream_engine.sql SQL -- Clickhouse queue wrapper CREATE TABLE demo_events_queue ON CLUSTER '{cluster}' ( -- JSON content schema user_ts String, id UInt64, message String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'KAFKA_HOST:9091', kafka_topic_list = 'TOPIC_NAME', kafka_group_name = 'uniq_group_id', kafka_format = 'JSONEachRow'; -- Format In this query, three things are established: Schema of data: A ClickHouse table structure containing three defined columns; Data format: The format specified as ‘JSONEachRow,’ suitable for parsing newline-delimited JSON data; Kafka configuration: The settings for the Kafka host and topic are included to link the data source with ClickHouse. The next step in the setup involves defining a target table in ClickHouse that will store the processed data: /example_projects/clickstream/kafka_stream_engine.sql#L12-L23 SQL -- Table to store data CREATE TABLE demo_events_table ON CLUSTER '{cluster}' ( topic String, offset UInt64, partition UInt64, timestamp DateTime64, user_ts DateTime64, id UInt64, message String ) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/demo_events_table', '{replica}') PARTITION BY toYYYYMM(timestamp) ORDER BY (topic, partition, offset); This table will be structured using the ReplicatedMergeTree engine, providing robust data storage capabilities. In addition to the base data columns, the table will include additional columns derived from the metadata provided by Kafka Engine, allowing for enriched data storage and query capabilities. /example_projects/clickstream/kafka_stream_engine.sql#L25-L34 SQL -- Delivery pipeline CREATE MATERIALIZED VIEW readings_queue_mv TO demo_events_table AS SELECT -- kafka engine virtual column _topic as topic, _offset as offset, _partition as partition, _timestamp as timestamp, -- example of complex date parsing toDateTime64(parseDateTimeBestEffort(user_ts), 6, 'UTC') as user_ts, id, message FROM demo_events_queue; The final step in the integration process is to set up a materialized view within ClickHouse that bridges the Kafka Engine table with your target table. This materialized view will automate the transformation and insertion of data from the Kafka topic into the target table, ensuring that the data is consistently and efficiently processed and stored. Together, these configurations facilitate a robust pipeline for streaming data from Kafka into ClickHouse: Shell SELECT count(*) FROM demo_events_table Query id: f2637cee-67a6-4598-b160-b5791566d2d8 ┌─count()─┐ │ 6502 │ └─────────┘ 1 row in set. Elapsed: 0.336 sec. When deploying all three stages—reading, transforming, and writing—within ClickHouse, this setup is generally more manageable for smaller datasets. However, it might not scale as effectively for larger workloads. Under heavy load, ClickHouse typically gives preference to query operations, which could lead to increased latency in data delivery as resource competition arises. This is an important consideration when planning for high-volume data handling. While the Kafka Engine integration is functional, it presents several challenges: Offset management: Malformed data in Kafka can stall ClickHouse, requiring manual intervention to delete offsets, a task that can be demanding. Limited observability: Since operations are internal to ClickHouse, monitoring is more complex and relies heavily on analyzing ClickHouse logs to understand system behavior. Scalability concerns: Executing parsing and reading inside ClickHouse could hinder scaling during high loads, which might lead to resource contention issues. Utilizing Kafka Connect Kafka Connect offers a different approach by reallocating the complexities of data management from ClickHouse to Kafka. This strategy involves a careful decision about where to handle the data management intricacies. In this model, tasks such as reading, parsing, and writing are managed within Kafka Connect, which operates as part of the Kafka system. The trade-offs in this approach are similar but involve shifting the processing burden from the data storage side to the buffering side. An illustrative example is provided here to demonstrate how to establish this connection. Opting for an External Writer The External Writer approach represents a premium solution, offering superior performance for those who are ready to invest more. It typically involves an external system responsible for data handling, positioned outside of both the buffer (Kafka) and storage (ClickHouse) layers. This setup might even be co-located with the data-producing sources, offering a high level of efficiency and speed. The following diagram simplifies this configuration, showcasing how external writers can be integrated into the data pipeline: External Writer via DoubleCloud For implementing an external writer approach using DoubleCloud Transfer, the setup involves two primary components: source and destination endpoints, along with the transfer mechanism itself. This configuration is efficiently managed using Terraform. A key element in this setup is the parser rule for the Source endpoint, which is critical for accurately interpreting and processing the incoming data stream. The details of this configuration are outlined here: /example_projects/clickstream/transfer.tf#L16-L43 ProtoBuf parser { json { schema { fields { field { name = "user_ts" type = "datetime" key = false required = false } field { name = "id" type = "uint64" key = false required = false } field { name = "message" type = "utf8" key = false required = false } } } null_keys_allowed = false add_rest_column = true } } The parser configuration in DoubleCloud Transfer plays a similar role to the DDL specifications in ClickHouse. It's crucial for ensuring the correct interpretation and processing of incoming data. Once the source endpoint is established, the next step is to add the target database, which is typically more straightforward: /example_projects/clickstream/transfer.tf#L54-L63 ProtoBuf clickhouse_target { clickhouse_cleanup_policy = "DROP" connection { address { cluster_id = doublecloud_clickhouse_cluster.target-clickhouse.id } database = "default" user = "admin" } } Finally, link them together into a transfer: /example_projects/clickstream/transfer.tf#L67-L75 ProtoBuf resource "doublecloud_transfer" "clickstream-transfer" { name = "clickstream-transfer" project_id = var.project_id source = doublecloud_transfer_endpoint.clickstream-source[count.index].id target = doublecloud_transfer_endpoint.clickstream-target[count.index].id type = "INCREMENT_ONLY" activated = true } With the completion of these steps, your data delivery system utilizing DoubleCloud Transfer is now operational. This setup ensures a seamless flow of data from the source to the target database, effectively managing the entire process. DoubleCloud's EL(t) engine, Transfer, integrates Queue Engine to ClickHouse delivery, tackling common challenges: Automated offset management: Transfer automates the handling of corrupt data through unparsed tables, minimizing the need for manual offset management. Enhanced observability: Unlike limited monitoring in ClickHouse, Transfer provides dedicated dashboards and alerts for real-time insights into delivery metrics like data lag, row counts, and bytes delivered. Dynamic scalability: Transfer's delivery jobs, hosted on Kubernetes, EC2, or GCP, allow for scalable operations independent of ClickHouse. Transfer additionally provides out-of-the-box features to enhance its functionality: Automatic schema evolution: Automatically synchronizes backward-compatible schema changes with the target storage. Automatic dead-letter queue: Efficiently manages corrupt data by redirecting it to a designated Dead-Letter Queue (DLQ) within the ClickHouse table. External Writer via Clickpipes ClickPipes offers a simplified and efficient solution for ingesting data from various sources. Its user-friendly interface allows for quick setup with minimal effort. Engineered for high-demand scenarios, ClickPipes boasts a robust, scalable architecture that delivers consistent performance and reliability. While it shares similarities with DoubleCloud Transfer in terms of functionality, ClickPipes does not support automatic schema evolution. For detailed setup instructions, a comprehensive guide is available here. Conclusion In this article, we've explored various methodologies for integrating Kafka with ClickHouse, focusing on options like the Kafka Engine, Kafka Connect, DoubleCloud Transfer, and ClickPipes. Each of these approaches offers unique strengths and considerations tailored to different data processing requirements and operational scales. From resource management to system scalability, the selection of the right approach is crucial for optimal data handling. To further explore the synergy of Kafka and ClickHouse, consider diving into the DoubleCloud stack. They provide insightful Terraform examples that can be a great starting point for those looking to implement these powerful tools in their data processing workflows. For more detailed guidance, check out their Terraform exemplars.
Data deduplication is a technique used to eliminate duplicate records or rows from a dataset. Data deduplication holds significant importance in the Big Data world due to the scale and volume of data handled in Big Data environments. Here are some key reasons why data deduplication is crucial in the context of Big Data: Storage efficiency: Big Data systems deal with massive amounts of data generated from various sources. Storing redundant data consumes a considerable amount of storage space. Data deduplication eliminates duplicate records, reducing storage requirements and optimizing storage efficiency. Cost savings: Storing and managing large volumes of data can be expensive. By deduplicating data, organizations can reduce their storage costs significantly, leading to cost savings in infrastructure and maintenance. Faster processing: When processing large datasets, data deduplication can improve data access times and query performance. With less redundant data to process, queries and analysis can be executed faster, enabling quicker insights and decision-making. Data quality and consistency: Duplicate data can lead to data inconsistency and errors in analysis. By removing duplicates, data quality improves, ensuring that analytics and business intelligence reports are accurate and reliable. Streamlining data workflows: Big Data workflows often involve data integration from multiple sources. Data deduplication simplifies the integration process by reducing the number of unique data records to be processed. Enhanced data analytics: Big Data analytics and machine learning models can be more accurate when working with clean and deduplicated data. Eliminating duplicates ensures that algorithms aren't influenced by repeated data points. Backup and disaster recovery: Data deduplication can also be valuable in backup and disaster recovery scenarios. Storing unique data in backups reduces backup storage requirements and improves recovery times. Data privacy and compliance: In scenarios where sensitive data needs to be anonymized or pseudonymized for privacy and regulatory compliance, data deduplication can help maintain data privacy while minimizing the risk of reidentification through duplicates. Data governance: Maintaining clean and deduplicated data supports effective data governance practices. It ensures that data is consistent, well-maintained, and adheres to data governance policies. Scalability: Data deduplication techniques need to be scalable to handle the vast amount of data generated in Big Data environments. Efficient deduplication algorithms and distributed computing can ensure scalability and high-performance processing. Several topics like this are discussed on my YouTube channel. Please visit. I appreciate your support. In Hive, data deduplication can be achieved using various methods, such as using the DISTINCT keyword, GROUP BY, or window functions like ROW_NUMBER(). Let's explore these methods with code examples and a real-time scenario. Suppose we have a Hive table called sales_data with the following structure: transaction_id product_id sale_amount sale_date Created through the DDL: SQL CREATE TABLE sales_data ( transaction_id INT, product_id STRING, sale_amount DOUBLE, sale_date DATE); Let's assume we have a dataset with sales data for an online store. The dataset may contain duplicate records due to various reasons, such as system glitches, data integration issues, or multiple entries for the same transaction. transaction_id product_id sale_amount sale_date 1 ABC123 100 2023-07-01 2 DEF456 50 2023-07-02 3 GHI789 75 2023-07-03 4 ABC123 100 2023-07-01 5 XYZ999 200 2023-07-04 Method 1: Using DISTINCT Keyword The DISTINCT keyword is used to eliminate duplicate rows from the result set. SQL -- Create a new table with deduplicated records CREATE TABLE sales_data_dedup AS SELECT DISTINCT transaction_id, product_id, sale_amount, sale_date FROM sales_data; transaction_id product_id sale_amount sale_date 1 ABC123 100 2023-07-01 2 DEF456 50 2023-07-02 3 GHI789 75 2023-07-03 5 XYZ999 200 2023-07-04 In Hive, the DISTINCT keyword internally uses a hash-based aggregation to identify and remove duplicates. This can be resource-intensive for large datasets and may not be an efficient method! Method 2: Using GROUP BY We can use GROUP BY to group the records based on specific columns and then apply aggregate functions like SUM, COUNT, etc. In this case, we'll use GROUP BY to remove duplicates. To use GROUP BY to remove duplicates, we can select the unique rows by grouping the data based on the columns that define uniqueness and then select the first row from each group. The "first row" can be chosen arbitrarily since we are not using any aggregate functions. Here's the Hive query using GROUP BY to remove duplicates: SQL -- Create a new table with deduplicated records using GROUP BY CREATE TABLE sales_data_dedup AS SELECT transaction_id, product_id, sale_amount, sale_date FROM sales_data GROUP BY transaction_id, product_id, sale_amount, sale_date; transaction_id product_id sale_amount sale_date 1 ABC123 100 2023-07-01 2 DEF456 50 2023-07-02 3 GHI789 75 2023-07-03 5 XYZ999 200 2023-07-04 In this example, we grouped the rows based on the columns transaction_id, product_id, sale_amount, and sale_date. As a result, the duplicates with the same values in these columns were combined into groups, and then we selected the "first row" from each group, effectively removing the duplicates. It's important to note that when using GROUP BY to remove duplicates, the order of rows within each group is not guaranteed. If the order of rows is significant, consider using the ROW_NUMBER() window function to remove duplicates while maintaining the desired order. Method 3: Using ROW_NUMBER() Window Function The ROW_NUMBER() window function assigns a unique integer to each row based on the specified order. By using this function and selecting only rows with ROW_NUMBER() = 1, we can deduplicate the data. SQL -- Create a new table with deduplicated records using ROW_NUMBER() CREATE TABLE sales_data_dedup AS SELECT transaction_id, product_id, sale_amount, sale_date FROM ( SELECT transaction_id, product_id, sale_amount, sale_date, ROW_NUMBER() OVER (PARTITION BY transaction_id, product_id, sale_amount, sale_date ORDER BY transaction_id) as row_num FROM sales_data ) t WHERE row_num = 1; In all the methods, we successfully deduplicated the sales data and created a new table sales_data_dedup containing unique records. Data deduplication is an essential step in data processing pipelines, as it helps in maintaining data quality, reduces storage costs, and improves query performance. In real-time scenarios, data deduplication can be applied to various datasets like customer data, transaction data, log files, etc., to ensure data consistency and efficiency. In conclusion, data deduplication plays a vital role in the Big Data world by optimizing storage, improving data quality, enhancing data processing efficiency, and facilitating accurate analytics and decision-making. As organizations continue to deal with ever-growing volumes of data, data deduplication remains a critical aspect of managing and utilizing Big Data effectively. Must Reads for Continuous Learning Mastering System Design Head First Design Patterns Clean Code: A Handbook of Agile Software Craftsmanship Java Concurrency in Practice Java Performance: The Definitive Guide Designing Data-Intensive Applications Designing Distributed Systems Clean Architecture Kafka — The Definitive Guide Becoming An Effective Software Engineering Manager
In an era when data management is critical to business success, exponential data growth presents a number of challenges for technology departments, including DRAM density limitations and strict budget constraints. These issues are driving the adoption of memory tiering, a game-changing approach that alters how data is handled and stored. Non-Volatile Random Access Memory (NVRAM), which is becoming more affordable and popular, is one of the key technologies designed to work within a tiered memory architecture. This article will investigate the fundamentals of NVRAM, compare it to traditional solutions, and provide guidelines for writing efficient NVRAM algorithms. What Is NVRAM? Non-Volatile Random Access Memory, or NVRAM, is a type of memory that retains data even when the power is turned off. It combines the RAM (Random Access Memory) and ROM (Read Only Memory) properties, allowing data to be read and written quickly like RAM while also being retained when the power is turned off like ROM. It is available on Intel-based computers and employs 3D Xpoint, a revolutionary memory technology that strikes a balance between the high speed of RAM and the persistence of traditional storage, providing a new solution for high-speed, long-term data storage and processing. NVRAM is typically used for specific purposes such as system configuration data storage rather than as a general-purpose memory for running applications. NVRAM is used in a variety of memory modules, including: Dual In-line Memory Modules (DIMMs), which use NVRAM to store firmware data or provide persistent memory Solid State Drives (SSDs) that use NVRAM to store firmware, wear-leveling data, and sometimes to cache writes Motherboard Chipsets that use NVRAM to store BIOS or UEFI settings PCIe Cards that use NVRAM for high-speed data storage or caching Hybrid memory modules use NVRAM in addition to traditional RAM. What Are the Differences Between NVRAM and RAM? To gain a better understanding of the differences between NVRAM and RAM, it is necessary to review the concepts of both types of memory. RAM, or Random Access Memory, is a type of memory that can be read and written to in any order and is commonly used to store working data and machine code. RAM is "volatile" memory, which means it can store data until the computer is powered down. Unlike RAM, NVRAM can retain stored information, making it ideal for storing critical data that must persist across reboots. It may contain information such as system configurations, user settings, or application state. Apart from this critical distinction, these types of memory differ in other ways that define their advantages and disadvantages: Speed: While DRAM is fast, especially when it comes to accessing and writing data, its speed is typically lower than what NVRAM strives for. NVRAM, in addition to potentially competing with DRAM (Dynamic RAM) in terms of speed, offers the durability of traditional non-volatile memory. Energy consumption: NVRAM consumes less power than RAM/DRAM, owing to the fact that it does not require power to retain data, whereas the latter requires constant refreshing. Cost and availability: NVRAM may be more expensive and less widely available at first than established memory technologies such as RAM, which is widely available and available in a variety of price ranges. What Distinguishes NVRAM Algorithms? Because NVRAM allows for the direct storage of important bits of information (such as program settings) in memory, it becomes a game changer in the industry. The NVRAM algorithms are defined by several key characteristics: NVRAM provides new opportunities for developing recoverable algorithms, allowing for efficient recovery of a program's state following system or individual process failure. NVRAM frequently has faster read and write speeds than traditional magnetic disc drives or flash-based SSDs, making it suitable for high-performance computing and real-time tasks that require quick data access. Some types of NVRAM, such as flash memory, are prone to wear due to frequent rewrites. This necessitates the use of special wear-leveling algorithms, which distribute write operations evenly across the memory in order to extend its lifespan. Integrating NVRAM into systems necessitates taking into account its distinct characteristics, such as access speed and wear management. This could entail modifying existing algorithms and system architectures. How To Write NVRAM Algorithms Mutual Exclusion Algorithms Mutex (Mutual Exclusion) algorithms are designed to ensure that multiple processes can manage access to shared resources without conflict, even in the event of system crashes or power outages. The following are the key requirements for this type of algorithm: Mutual exclusion: It ensures that only one process or thread can access a critical section at the same time, preventing concurrent access to shared resources. Deadlock-free: This avoids situations in which processes are indefinitely waiting for each other to release resources, ensuring that programs run continuously. Starvation-free: This ensures that every process has access to the critical section, preventing indefinite delays for any process. Peterson's Algorithm for NVRAM Peterson's Algorithm is an example of an algorithm that can be adapted for NVRAM. In computer science, it is a concurrency control algorithm used to achieve mutual exclusion in multi-threading environments. It enables multiple processes to share a single-use resource without conflict while ensuring that only one process has access to the resource at any given time. In an NVRAM environment, Peterson's algorithm, which was originally designed for two processes, can be extended to support multiple processes (from 0 to n-1). Adapting Peterson's algorithm for NVRAM entails not only expanding it to support multiple processes but also incorporating mechanisms for post-failure recoverability. To adapt Peterson's algorithm for recoverability in NVRAM include specific recovery code that allows a process to re-enter the critical section after a crash. This might involve checking the state of shared variables or locks to determine the last known state before the crash. To write the algorithm, you must first complete the following steps: Initialization: In NVRAM, define shared variables (flag array, turn variable). Set these variables to their default values, indicating that no processes are currently in the critical section. Entry section: Each process that attempts to enter the critical section sets a flag in the NVRAM. After that, the process sets the turn variable to indicate its desire to enter the critical section. It examines the status of other processes' flags as well as the turn variable to see if it can enter the critical section. Critical section: Once inside, the process gets to work. NVRAM stores any state changes or operations that must be saved. Exit section: When the process completes its operations, it resets its flag in the NVRAM, indicating that it has exited the critical section. Recovery mechanism: Include code to handle crashes during entry or exit from the critical section. If a process fails in the entry section, it reads the state of competitors and determines whether to continue. If a process crashes in the exit section, it re-executes the entry section to ensure proper state updates. Handling process failures: Use logic to determine whether a failed process completed its operation in the critical section and take appropriate action. Tournament tree for process completion: Create a hierarchical tournament tree structure. Each process traverses this tree, running recovery and entry code at each level. If necessary, include an empty recovery code segment to indicate that the process is aware of its failure state. Nonblocking Algorithms Nonblocking algorithms are a type of concurrent programming algorithm that enables multiple threads to access and modify shared data without using locks or mutual exclusion mechanisms. These algorithms are intended to ensure that the failure or suspension of one thread does not prevent other threads from progressing. The following are the primary requirements of nonblocking algorithms: Nonblocking algorithms are frequently lock-free, which means that at least one thread makes progress in a finite number of steps even if other threads are delayed indefinitely. Wait-free: A more powerful type of nonblocking algorithm is wait-free, in which each thread is guaranteed to complete its operation in a finite number of steps, regardless of the activity of other threads. Obstruction-free: The most basic type of nonblocking algorithm, in which a thread can finish its operation in a finite number of steps if it eventually operates without interference from other threads. Linearizability is a key concept in concurrent programming that is associated with nonblocking algorithms. It ensures that all operations on shared resources (such as read, write, or update) appear to occur in a single, sequential order that corresponds to the actual order of operations in real time. Nonblocking Algorithm Example Let's take a look at the recoverable version of the CAS program, which is intended to make operations more resilient to failures. The use of a two-dimensional array is a key feature of this implementation. This array acts as a log or record, storing information about which process (or "who") wrote a value and when it happened. Such logging is essential in a recoverable system, particularly in NVRAM, where data persists despite system reboots or failures. The linearizability of operations, which ensures that operations appear to occur in a sequential order consistent with their actual execution, is a key feature of this algorithm. The CAS RECOVER function's evaluation order is critical for maintaining linearizability: If process p1 fails after a successful CAS operation and then recovers, evaluating the second part of the expression in CAS.RECOVER first can lead to non-linearizable execution. This is because another process, p2, could complete a CAS operation in the meantime, changing the state in a way that's not accounted for if p1 only checks the second part of the condition. Therefore, the first part of the condition (checking C=<p,new>) must be evaluated before the second part (checking if new is in R[p][1] to R[p][N]). Conclusion This article delves into the fundamental concepts of NVRAM, a new type of memory, compares it to RAM, presents key requirements for mutex and nonblocking algorithms, and offers guidelines for developing efficient NVRAM algorithms.
Big data has become increasingly important in today's data-driven world. It refers to the massive amount of structured and unstructured data that is too large to be handled by traditional database systems. Companies across various industries rely on big data analytics to gain valuable insights and make informed business decisions. To efficiently process and analyze this vast amount of data, organizations need a robust and scalable architecture. One of the key components of an effective big data architecture is the real-time pipeline which enables the processing of data as it is generated allowing organizations to respond quickly to new information and changing market conditions. Real-time pipelines in big data architecture are designed to ingest, process, transform, and analyze data in near real-time, providing instant insights and enabling businesses to take immediate actions based on current information. These pipelines handle large volumes of data streams and move them through different stages to extract valuable insights. The architecture of a real-time big data pipeline typically consists of several components, including data sources, data ingestion, storage, processing, analysis, and visualization. Let's take a closer look at each of these components: 1. Data Sources: Data sources can be structured or unstructured and can include social media feeds, IoT devices, log files, sensors, customer transactions, and more. These data sources generate a continuous stream of data that needs to be processed in real time. 2. Data Ingestion: The data ingestion stage involves capturing and collecting data from various sources and making it available for processing. This process can include data extraction, transformation, and loading (ETL), data cleansing, and data validation. 3. Storage: Real-time pipelines require a storage system that can handle high-velocity data streams. Distributed file systems like Apache Hadoop Distributed File System (HDFS) or cloud-based object storage like Amazon S3 are commonly used to store incoming data. 4. Processing: In this stage, the collected data is processed in real-time to extract meaningful insights. Technologies like Apache Kafka, Apache Storm, or Apache Samza are often used for real-time stream processing, enabling the continuous processing of incoming data streams. 5. Analysis: Once the data is processed, it is ready for analysis. Complex event processing (CEP) frameworks like Apache Flink or Apache Spark Streaming can be used to detect patterns, correlations, anomalies, or other insights in real-time data. 6. Visualization: The final stage involves making the analyzed data easily understandable and accessible to the end-users. Data visualization tools like Tableau or Power BI can be used to create interactive dashboards, reports, or visual representations of the insights derived from real-time data. Here is a sample code for a real-time pipeline using big data technologies like Apache Kafka and Apache Spark: How To Set Up Apache Kafka Producer: Python from kafka import KafkaProducer # Create a Kafka producer producer = KafkaProducer(bootstrap_servers='localhost:9092') # Send messages to a Kafka topic for i in range(10): producer.send('my_topic', value=str(i).encode('utf-8')) # Close the producer producer.close() How To Set Up Apache Spark Consumer: Python from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils # Create a Spark context sc = SparkContext(appName='Real-time Pipeline') # Create a Streaming context with a batch interval of 1 second ssc = StreamingContext(sc, 1) # Read data from Kafka topic kafka_params = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'my_group_id', 'auto.offset.reset': 'earliest' } kafka_stream = KafkaUtils.createDirectStream(ssc, ['my_topic'], kafkaParams=kafka_params) # Process the incoming data processed_stream = kafka_stream.map(lambda x: int(x[1])).filter(lambda x: x % 2 == 0) # Print the processed data processed_stream.pprint() # Start the streaming context ssc.start() ssc.awaitTermination() In this example, the producer sends messages to a Kafka topic 'my_topic'. The Spark consumer consumes the data from the topic, processes it (in this case, filters out odd numbers), and prints the processed data. This code sets up a real-time pipeline, where the data is processed as it comes in Make sure you have Apache Kafka and Apache Spark installed and running on your machine for this code to work. Overall, a well-designed real-time big data pipeline architecture enables organizations to leverage the power of big data in making instant and data-driven decisions. By processing and analyzing data in real time, businesses can respond promptly to emerging trends, customer demands, or potential threats. Real-time pipelines empower organizations to gain a competitive edge and enhance their operational efficiency. However, building and maintaining a real-time big data pipeline architecture can be complex and challenging. Organizations need to consider factors like scalability, fault tolerance, data security, and regulatory compliance. Additionally, choosing the right technologies and tools that fit specific business requirements is essential for building an effective real-time big data pipeline. Conclusion: Big data real-time pipeline architecture plays a crucial role in handling the vast amount of data generated by organizations today. By enabling real-time processing, analysis, and visualization of data, businesses can harness the power of big data and gain valuable insights to drive their success in today's evolving digital landscape.
Apache NiFi is an easy-to-use, powerful, highly available, and reliable system to process and distribute data. Made for data flow between source and target systems, it is a simple robust tool to process data from various sources and targets (find more on GitHub). NiFi has 3 repositories: FlowFile Repository: Stores the metadata of the FlowFiles during the active flow Content Repository: Holds the actual content of the FlowFiles Provenance Repository: Stores the snapshots of the FlowFiles in each processor; with that, it outlines a detailed data flow and the changes in each processor and allows an in-depth discovery of the chain of events NiFi Registry is a stand-alone sub-project of NiFi that allows version control of NiFi. It allows saving FlowFile state and sharing FlowFiles between NiFi applications. Primarily used to version control the code written in Nifi. General Setup and Usage As data flows from the source to the target, the data and metadata of the FlowFile reside in the FlowFile and content repositories. NiFi stores all FlowFile content on disk to ensure resilience across restarts. It also provides backpressure to prevent data consumers/sources from overwhelming the system if the target is unable to keep up for some time. For example, ConsumeKafka receives data as a FlowFile in NiFi (through the ConsumeKafka processor). Say the target is another Kafka topic (or Hive/SQL/Postgres table) after general filters, enrichments, etc. However, if the target is unavailable, or any code fails to work as expected (i.e., the filter code or enrichment code), the flow stops due to backpressure, and ConsumeKafka won't run. Fortunately, data loss does not occur because the data is present in the content repository, and once the issue is resolved, the data resumes flowing to the target. Most application use cases work well in this setup. However, some use cases may require a slightly different architecture than what traditional NiFi provides. Use Cases If a user knows that the data source they are receiving data from is both persistent and replayable, it might be more beneficial to skip storing the data (in NiFi, as FlowFile in the content repository) instead of replaying the data from the source after restarting. This approach has multiple advantages. Firstly, data could be stored in memory instead of on disk, offering better performance and faster load times. Secondly, it enables seamless data transfer between machines without any loss. This can be achieved with the NiFi EXECUTESTATELESS processor. How to Setup and Run First, prepare the flow you want to set up. For example: Consume Kafka receives the data as FlowFile to the content repository. Application code runs (general filters/enrichments, etc.) publish to another Kafka/writes to Hive/SQL table/Postgres table, etc. Say the code, which consumes a lot of resources on disk/CPU due to some filter/enrichment, can be converted to the EXECUTESTATELESS process and can be run in memory.The flow looks like this: Consumekafka --> executestateless processor --> publish kafka/puthiveql/putdatabaserecord. 3. When the stateless process fails and because of this back pressure occurs, and data can be replayed after the issue is resolved. As this is executed in memory, it is faster compared to a conventional NiFi run. 4. Once the above code is ready (#2), keep it in processgroup. Right-click and check the code to NiFi Registry to start version control. 5. Now complete the full setup of the code: Drag the consumekafka and set up the configs like Kafka topic/SSL config/offset, etc. properties (considering the above example). Drag the execute stateless processor and follow step 7 below to configure. Connect this to the consumekafka processor and publishkafka processor as per the flow shown in #3. Drag publishKafka and set up the configs like Kafka topic/SSL config/any other properties like compression, etc. An important point to note: If this code uses any secrets, such as keystore/truststore passwords or database credentials, they should be configured within the processgroup for which the executestateless process is going to run. This should also be passed from the executestateless process as variables with the same name as to how the configuration is made inside the process group. 6. The screenshot below shows the configuration of the executestateless processor: Dataflow specification strategy: Use the NiFi registry Registry URL: Configured NiFi Registry URL Registry bucket: Specific bucket name where the code has been checked Flow name: The name of the flow where the code has been checked Input port: The name of the port where consumekafka is connecting (considering the above example); the process group should have an input port - if you have multiple inputs, give the names as comma-separated Failure port: In case of any failures, the actual code should have failure ports present and these FlowFiles can be reprocessed again. If you have multiple failure ports, give the names as comma-separated. 7. Based on the point mentioned in #6 above, add additional variables at the end of this as shown below for any of the secrets. Content storage strategy: change it to "store content on heap". Please note: One of the most impactful configuration options for the Processor is the configuration of the "Content Storage Strategy" property. For performance reasons, the processor can be configured to hold all FlowFiles in memory. This includes incoming FlowFiles, as well as intermediate and output FlowFiles. This can be a significant performance improvement but comes with a significant risk. The content is stored on NiFi's heap. This is the same heap that is shared by all other ExecuteStateless flows by NiFi's processors and the NiFi process itself. If the data is very large, it can quickly exhaust the heap, resulting in out-of-memory errors in NiFi. These, in turn, can result in poor performance, as well as instability of the NiFi process itself. For this reason, it is not recommended to use the "Store Content on Heap" option unless it is known that all FlowFiles will be small (less than a few MB). Also, in order to help safeguard against the case that the processor receives an unexpectedly large FlowFile, the "Max Input FlowFile Size" property must be configured when storing data on the heap. Alternatively, and by default, the "Content Storage Strategy" can be configured to store FlowFile content on disk. When this option is used, the content of all FlowFiles is stored in the configured Working Directory. It is important to note, however, that this data is not meant to be persisted across restarts. Instead, this simply provides the stateless engine with a way to avoid loading everything into memory. Upon restart, the data will be deleted instead of allowing FlowFiles to resume from where they left off (reference). 8. The final flow looks like this: Conclusion Stateless NiFi provides a different runtime engine than traditional NiFi. It is a single-threaded runtime engine, in which data is not persisted across restarts, but this can be run in multi-threaded. Make sure to set up multiple threads (according to the use case as described below). As explained above in step 7, performance implications should be considered. When designing a flow to use with Stateless, it is important to consider how the flow might want to receive its data and what it might want to do with the data once it is processed. Different options are as below: The flow to fully encapsulate the source of data and all destinations: For example, it might have a ConsumeKafkaRecord processor, perform some processing, and then publish to another topic via PublishKafkaRecord. Build a flow that sources data from some external source, possibly performing some processing, but not defining the destination of the data. For example, the flow might consist of a ConsumeKafkaRecord processor and perform some filtering and transformation, but stop short of publishing the data anywhere. Instead, it can transfer the data to an output port, which could then be used by ExecuteStateless to bring that data into the NiFi dataflow. A dataflow may not define where it receives its input from, and instead just use an input port, so that any dataflow can be built to source data, and then deliver it to this dataflow, which is responsible for preparing and delivering the data. Finally, the dataflow may define neither the source nor the destination of the data. Instead, the dataflow will be built to use an input port, it will perform some filtering/routing/transformation, and finally provide its processing results to an Output Port.(reference). Both the traditional NiFi Runtime Engine and the Stateless NiFi Runtime Engine have their strengths and weaknesses. The ideal situation would be one in which users could easily choose which parts of their data flow run Stateless and which parts run in the traditional NiFi Runtime Engine. Additional Reference NiFi: ExecuteStateless
Apache Kafka stands as a robust distributed streaming platform. However, like any system, it is imperative to proficiently oversee and control latency for optimal performance. Kafka Consumer Lag refers to the variance between the most recent message within a Kafka topic and the message that has been processed by a consumer. This lag may arise when the consumer struggles to match the pace at which new messages are generated and appended to the topic. Consumer lag in Kafka may manifest due to various factors. Several typical reasons for consumer lag are. Insufficient consumer capacity. Slow consumer processing. High rate of message production. Additionally, Complex data transformations, resource-intensive computations, or long-running operations within consumer applications can delay message processing. Poor network connectivity, inadequate hardware resources, or misconfigured Kafka brokers can eventually increase lag too.In a production environment, it's essential to minimize lag to facilitate real-time or nearly real-time message processing, ensuring that consumers can effectively match the message production rate. Fig 1. Apache Kafka Consumer Log Rate-limiting and backpressure are concepts related to managing and controlling the flow of data within a system, and they play a crucial role in handling Apache Kafka consumer lags. Rate-limiting involves controlling the speed at which data is processed or transmitted to prevent overwhelming a system. In the context of Kafka consumer lags, when consuming messages from a Kafka topic, rate-limiting can be applied to control the rate at which the consumer reads and processes messages. This is important to prevent a consumer from falling behind and experiencing lag. Backpressure is a mechanism used to handle situations where a downstream component or system is not able to keep up with the rate at which data is being sent to it. It signals to the upstream system to slow down or stop producing data temporarily. In that respect, when a Kafka consumer is experiencing lag, it means it is not able to keep up with the rate at which messages are being produced. Backpressure mechanisms can be implemented to inform the producer (or an intermediate component) to slow down the production of messages until the consumer catches up. Using Rate-Limiting and Backpressure in Apache Kafka To implement Rate-Limiting, we can configure the Kafka consumer to limit the rate of message consumption. This can be achieved by adjusting the max. poll.records configuration or by introducing custom throttling mechanisms in the consumer application. There is a pause and resume method in the Kafka API. Kafka facilitates the dynamic control of consumption flows through the use of pause(Collection) and resume(Collection), enabling the suspension of consumption on specific assigned partitions. Backpressure is storing incoming records in a queue and processing each one individually at a pace set by the queue's capacity. This can be helpful if we want to make sure that the consumer can process records as they are produced without falling behind or if the rate of message production is steady. We may select to execute enable.auto.commit=false on the consumer and commit only after the completion of the consumer operation to avoid auto-commit. This may slow down the consumer, but it allows Kafka to keep track of the number of messages processed by the consumer. We can also improve the process by setting the poll interval max.poll.interval.ms and the number of messages to be consumed in each poll using max.poll.records. Besides, we can consider using external tools or frameworks that support backpressure, such as Apache Flink. Various third-party monitoring tools and user interfaces offer an intuitive platform for visualizing Kafka lag metrics. Options include Burrow, Grafana for ongoing monitoring, or a local Kafka UI connected to our production Kafka instance. Thank you for reading the article. If you found this content valuable, please consider liking and sharing your thoughts below.
This blog post explores the state of data streaming for the gaming industry in 2023. The evolution of casual and online games, Esports, social platforms, gambling, and new business models require a reliable global data infrastructure, real-time end-to-end observability, fast time-to-market for new features, and integration with pioneering technologies like AI/machine learning, virtual reality, and cryptocurrencies. Data streaming allows integrating and correlating data in real-time at any scale to improve most business processes in the gaming sector much more cost-efficiently. I look at game industry trends to explore how data streaming helps as a business enabler, including customer stories from Kakao Games, Mobile Premier League (MLP), Demonware / Blizzard, and more. A complete slide deck and on-demand video recording are included. Data Streaming in the Gaming Industry with Apache Kafka and Flink General Trends in the Gaming Industry The global gaming market is bigger than the music and film industries combined! Digitalization has played a huge factor in the growth in the past years. The gaming industry has various business models connecting players, fans, vendors, and other stakeholders: Hardware sales: Game consoles, VR sets, glasses Game sales: Physical and digital Free-to-play + in-game purchases: One-time in-game purchases (skins, champions, miscellaneous), gambling (loot boxes) Game-as-a-service (subscription): Seasonal in-game purchases like passes for theme events, mid-season invitational & world championship, passes for competitive play Game-Infrastructure-as-a-Service: High-performance state synchronization, multiplayer, matchmaking, gaming statistics Merchandise sales: T-shirts, souvenirs, fan equipment Community: Esports broadcast, ticket sales, franchising fees Live betting Video streaming: Subscriptions, advertisements, rewards, Growth and Innovation Require Cloud-Native Infrastructure Most industries require a few specific characteristics. Instant payments must be executed in real time without data loss. Telcom infrastructure monitors huge volumes of logs in near-real-time. The retail industry needs to scale up for events like Christmas or Black Friday and scale down afterward. The gaming industry combines all the characteristics of other industries: Real-time data processing Scalability for millions of players High availability, at least for transactional data Decoupling for innovation and faster roll-out of new features Cost efficiency because cloud networking for huge volumes is expensive The flexibility of adopting various innovative technologies and APIs Elasticity for critical events a few times a year Standards-based integration for integration with SaaS, B2B, and mobile apps Security for trusted customer data Global and vendor-independent cloud infrastructure to deploy across countries The good news is that data streaming powered by Apache Kafka and Apache Flink provides all these characteristics on a single platform, especially if you choose a fully managed SaaS offering. Data Streaming in the Gaming Industry Adopting gaming trends like in-game purchases, customer-specific discounts, and massively multiplayer online games (MMOG) is only possible if enterprises in the games sector can provide and correlate information at the right time in the proper context. Real-time, which means using the information in milliseconds, seconds, or minutes, is almost always better than processing data later (whatever later means): Data streaming combines the power of real-time messaging at any scale with storage for true decoupling, data integration, and data correlation capabilities. Apache Kafka is the de facto standard for data streaming. "Apache Kafka in the Gaming Industry" is a great starting point to learn more about data streaming in the games sector, including a few Kafka-powered case studies not covered in this blog post - such as Big Fish Games: Live operations by monitoring real-time analytics of game telemetry and context-specific recommendations for in-game purchases Unity: Monetization network for player rewards, banner ads, playable advertisements, and cross-promotions. William Hill: Trading platform for gambling and betting Disney+ Hotstar: Gamification of live sport video streaming Architecture Trends for Data Streaming The gaming industry applies various trends for enterprise architectures for cost, elasticity, security, and latency reasons. The three major topics I see these days at customers are: Fully managed SaaS to focus on business logic and faster time-to-market Event-driven architectures (in combination with request-response communication) to enable domain-driven design and flexible technology choices Data mesh for building new data products and real-time data sharing with internal platforms and partner APIs Let's look deeper into some enterprise architectures that leverage data streaming for gaming use cases. Cloud-Native Elasticity for Seasonal Spikes The games sector has extreme spikes in workloads. For instance, specific game events increase the traffic 10x and more. Only cloud-native infrastructure enables a cost-efficient architecture. Epic Games already presented at an AWS Summit in 2018 how elasticity is crucial for data-driven architecture. Make sure to use a truly cloud-native Apache Kafka service for gaming infrastructure. Adding brokers is relatively easy. Removing brokers is much harder. Hence, a fully managed SaaS should take over the complex operations challenges of distributed systems like Kafka and Flink for you. The separation of computing and storage is another critical piece of a cloud-native Kafka architecture to ensure cost-efficient scale. Data Mesh for Real-Time Data Sharing Data sharing across business units is important for any organization. The gaming industry has to combine exciting (different) data sets, like big data game telemetry, monetization and advertisement transactions, and 3rd party interfaces. Data consistency is one of the most challenging problems in the games sector. Apache Kafka ensures data consistency across all applications and databases, whether these systems operate in real-time, near-real-time, or batch. One sweet spot of data streaming is that you can easily connect new applications to the existing infrastructure or modernize existing interfaces, like migrating from an on-premise data warehouse to a cloud SaaS offering. New Customer Stories for Data Streaming in the Gaming Sector So much innovation is happening in the gaming sector. Automation and digitalization change how gaming companies process game telemetry data, build communities and customer relationships with VIPs, and create new business models with enterprises of other verticals. Most gaming companies use a cloud-first approach to improve time-to-market, increase flexibility, and focus on business logic instead of operating IT infrastructure. Elastic scalability gets even more critical with all the growing real-time expectations and mobile app capabilities. Here are a few customer stories from worldwide gaming organizations: Kakao Games: Log analytics and fraud prevention Mobile Premier League (MLP): Mobile eSports and digital gaming Demonware / Blizzard: Network and gaming infrastructure WhatNot: Retail gamification and social commerce Vimeo: Video streaming observability Resources To Learn More This blog post is just the starting point. Learn more about data streaming in the gaming industry in the following on-demand webinar recording, the related slide deck, and further resources, including pretty cool lightboard videos about use cases. On-Demand Video Recording The video recording explores the gaming industry's trends and architectures for data streaming. The primary focus is the data streaming case studies. I am excited to have presented this webinar in my interactive light board studio: This creates a much better experience, especially in a time after the pandemic, where many people are "Zoom fatigue". Check out our on-demand recording: Slides If you prefer learning from slides, check out the deck used for the above recording here. Case Studies and Lightboard Videos for Data Streaming in the Gaming Industry The state of data streaming for gaming in 2023 is fascinating. New use cases and case studies come up every month. This includes better end-to-end observability in real-time across the entire organization, telemetry data collection from gamers, data sharing and B2B partnerships with engines like Unity or video platforms like Twitch, new business models for ads and in-game purchases, and many more scenarios. Gaming is one of many industries that leverage data streaming with Apache Kafka and Apache Flink. Every month, we talk about the status of data streaming in a different industry. Manufacturing was the first. Financial services second, then retail, telcos, gaming, and so on... Check out my other blog posts. Let’s connect on LinkedIn and discuss it!
Hybrid data warehouses can both ingest and process data in real-time as streams and store and query this data in table formats. This dual functionality allows for low latency and high throughput in data processing, accommodating both streaming and batch analytics. Examples of such hybrid data warehouses include Apache Druid and Delta Lake. These technologies employ various methods like columnar storage, indexing, caching, and concurrency control to facilitate real-time data warehousing. Nonetheless, depending on their specific implementation and the use case, they may present complexity, reliability, or consistency challenges. As real-time data becomes increasingly critical in data engineering and analytics, choosing an appropriate data warehouse technology hinges on multiple factors. These include the data's volume, velocity, variety, and value, business needs, budget constraints, and available expertise. A thorough understanding of the strengths and limitations of each option can guide you in making a well-informed decision for constructing a robust and efficient data warehouse tailored to your real-time data requirements. What Is Apache Druid? Apache Druid is an open-source analytics database designed for high-performance real-time analytics. It's particularly well-suited for business intelligence (OLAP) queries on event data. Druid is commonly used in environments where real-time insights into large-scale data are crucial, such as e-commerce, financial services, and digital advertising. Key Features of Apache Druid Include: Real-Time Analytics: Druid excels at providing fast analytics on data as it's being ingested, enabling immediate insights into data streams. It offers rapid query execution across distributed systems and high-capacity data ingestion, ensuring low latency. It excels in processing various event data types, including clickstream, IoT data, and event data recorders (such as those used in Tesla vehicles). Scalability: Designed for scalability, it efficiently handles large volumes of data and can be scaled up to meet increased demand. Low Latency: Druid is optimized for low-latency queries, making it ideal for interactive applications where quick response times are critical. High Throughput Ingestion: It can ingest massive amounts of event data with high throughput, making it suitable for applications like clickstream analytics, network monitoring, and fraud detection. Flexible Data Aggregation: It supports quick and flexible data aggregations, essential for summarizing and analyzing large datasets, and facilitates quick data slicing, dicing, and aggregation queries. Distributed Architecture: Its distributed architecture allows for robust fault tolerance and high availability, distributing data and query load across multiple servers. Columnar Storage: It uses a columnar storage format, which enhances performance for analytic queries. Time-Partitioned Data: It boasts a robust architecture featuring time-based sharding, partitioning, column-oriented storage, indexing, data compression, and maintaining versioned, materialized views for high availability. Druid is often chosen for its ability to provide immediate insights, supporting both real-time and batch processing, and its robust scalability, making it a favorable choice for organizations needing to analyze large amounts of event-driven data quickly. Fig 1. Data Analytics Landscape Typical Data ingestion, storage, and data serving layer using Druid: Fig 2. Typical Data ingestion, storage, and data serving layer using Druid How Druid Operates: Its architecture is resilient and scalable, optimized for OLAP (Online Analytical Processing) with data formats designed for efficient analysis. Operations are massively parallelized, ensuring resource-aware processing during query execution and data ingestion. Druid allows for simultaneous ingestion of both batch and real-time data. Support for pre-fetch operations facilitates querying in under a second. Data tiering in Druid allows for the strategic utilization of infrastructure resources. It isolates long-running queries, ensuring they don't interfere with other operations. Key Components of Druid: Coordinator: The Druid Coordinator plays a crucial role in data distribution and management. It is responsible for distributing data into Druid Deep storage. It is also responsible for distributing copies of data to historical nodes, significantly enhancing query responses' efficiency and speed. By ensuring that data is appropriately populated into historical nodes, the Druid Coordinator effectively reduces latency, thereby facilitating high-speed queries. Overlord: The Druid Overlord is a key component in Apache Druid's architecture, primarily responsible for managing and coordinating data ingestion. Its primary functions include: Task Management: The Overlord oversees the assignment and supervision of data ingestion tasks, which can be either real-time or batch. It ensures these tasks are distributed and executed efficiently across the available resources. Scalability: It plays a crucial role in scaling the ingestion process, handling varying loads by dynamically assigning tasks to middle manager nodes. Fault Tolerance: In case of task failures, the Overlord is responsible for detecting these issues and reassigning the tasks to ensure continuous and reliable data ingestion. Load Balancing: The Overlord also manages the load on Druid's middle manager nodes, ensuring an even distribution of tasks for optimal performance. Router: The Druid Router is responsible for receiving queries from clients and directing them to the appropriate query-serving nodes, such as Broker nodes or directly to Historical nodes, depending on the query type and configuration. Broker: The Druid Broker is a critical component of the Apache Druid architecture, focusing on query processing and distribution. When a query is submitted to Druid, the Broker plays the central role in aggregating the results from various data nodes. It sends parts of the query to these nodes and then combines their results to form the final response. The Broker node knows the data segments' locations within the cluster. It routes queries intelligently to the nodes containing the relevant data segments, optimizing the query execution process for efficiency and speed. Brokers can also cache query results, which helps speed up the response time for frequent queries, as it avoids reprocessing the same data repeatedly. In summary, the Druid Broker is pivotal in orchestrating query processing within a Druid cluster, ensuring efficient query execution, result aggregation, and load balancing to optimize the performance and scalability of the system. Historicals: Druid Historical nodes are key components in the Apache Druid architecture, specifically designed for efficient data storage and retrieval. Here are their main characteristics: Single-Threaded Segment Processing: In Druid Historical nodes, each data segment is processed by a single thread. This approach simplifies the processing model and helps in the efficient utilization of system resources for querying and data retrieval. Automatic Tiering: Historical nodes support automatic tiering of data. Data can be categorized into different tiers based on usage or other criteria. This tiering helps optimize the storage and query performance, as frequently accessed data can be placed on faster, more accessible tiers. Data Management by Coordinator: The Druid Coordinator moves data into the appropriate tier within the Historical nodes. It manages data placement and ensures data is stored on the right tier, balancing load and optimizing storage utilization. Memory Mapping: Historical nodes use memory-mapped files for data storage. Memory mapping allows these nodes to leverage the operating system's virtual memory for data management, leading to efficient data access and reduced I/O overhead for queries. In essence, Druid Historical nodes are specialized for reliable and efficient long-term data storage and retrieval, with capabilities like single-threaded processing, automatic tiering, coordinator-led data management, and memory mapping to enhance performance. Middle Manager: The Druid Middle Manager is crucial in Apache Druid's data ingestion process. Druid Middle Managers are pivotal in the data ingestion pipeline of Druid, handling the distribution and execution of ingestion tasks while ensuring scalability and efficient resource management. Data Ingestion Management: Middle Managers are responsible for managing data ingestion into the Druid system. They handle both real-time and batch data ingestion tasks. Task Distribution: Each Middle Manager node can run one or more tasks that ingest data. These tasks are assigned and monitored by the Druid Overlord, who distributes the ingestion workload among available middle managers. Scalability: The architecture of Middle Managers allows for horizontal scalability. As data ingestion demands increase, more Middle Manager nodes can be added to the system to distribute the load effectively. Real-Time Data Processing: In the case of real-time data ingestion, Middle Managers are involved in initial data processing and handoff to Historical nodes for long-term storage. Worker Nodes: Middle Managers act as worker nodes. They execute the tasks assigned by the Overlord, which can include data indexing, processing, and temporary storage. Fig 3. Druid Middle Manager SQL-Based Ingestion (An Example): SQL INSERT INTO tbl SELECT TIME_PARSE("timestamp") AS __time, XXX, YYY, ZZZ FROM TABLE( EXTERN( '{"type": "s3", "uris": ["s3://bucket/file"]}', '{"type": "json"}', '[{"name": "XXX", "type": "string"}, {"name": "YYY", "type": "string"}, {"name": "ZZZ", "type": "string"}, {"name": "timestamp", "type": "string"}]' ) ) PARTITION BY FLOOR(__time TO DAY) CLUSTER BY XXX JSON-Based Ingestion (An Example): Fig 4. Example of JSON-based ingestion Fig 5. Basic functionality of Deep Storage Deep Storage: Deep storage in Apache Druid is a scalable and durable data storage system for permanent data retention. Deep storage in Druid provides a robust, scalable, and durable solution crucial for maintaining data integrity and availability in large-scale data analytics and business intelligence operations. Permanent Storage Layer: Deep storage acts as the primary data repository for Druid, where all the ingested data is stored for long-term retention. This is crucial for ensuring data persistence beyond the lifetime of the individual Druid processes. Support for Various Storage Systems: Druid is designed to be agnostic to the underlying storage system. It can integrate with deep storage solutions like Amazon S3, Google Cloud Storage, Hadoop Distributed File System (HDFS), and Microsoft Azure Storage. Data Segmentation: Data in deep storage is organized into segments, essentially partitioned, compressed, and indexed files. This segmentation aids in efficient data retrieval and querying. Fault Tolerance and Recovery: Deep storage provides the resilience to recover and reload data segments in a system failure. This ensures that data is not lost and can be accessed consistently. Scalability: Deep storage scales independently of the compute resources. As data grows, deep storage can be expanded without impacting the performance of the Druid cluster. Decoupling of Storage and Processing: Druid allows for flexible and cost-effective resource management by separating storage and processing. Compute resources can be scaled up or down as needed, independent of the data volume in deep storage. Data Backup and Archival: Deep storage also serves as a backup and archival solution, ensuring that historical data is preserved and can be accessed for future analysis. Segments in Deep Storage: Segments in deep storage within Apache Druid have distinct characteristics that optimize storage efficiency and query performance. Each segment typically contains between 3 to 5 million rows of data. This size is a balance between granularity for efficient data processing and large enough to ensure good compression and query performance. Data within a segment is partitioned based on time. This time-partitioning is central to Druid's architecture, as it allows for efficient handling and querying of time-series data. Within a segment, data can be clustered by dimension values. This clustering enhances the performance of queries that filter or aggregate data based on these dimensions. Once created, segments are immutable – they do not change. Each segment is versioned, enabling Druid to maintain different versions of the same data. This immutability and versioning are crucial for effective caching, as the cache remains valid until the segment is replaced or updated. Segments in Druid are self-describing, meaning they contain metadata about their structure and schema. This feature is important for schema evolution, as it allows Druid to understand and process segments even when the schema changes over time. These aspects of segment design in Druid are essential for its high-performance analytics capabilities, especially in handling large volumes of time-series data, optimizing query performance, and ensuring data consistency and reliability. Some Key Features of Segments Are Columnar Format: The data in deep storage is stored in a columnar format. This means each column of data is stored separately, enhancing query performance, especially for analytics and aggregation queries, as only the necessary columns need to be read and processed. Dictionary Encoding: Dictionary encoding is used to store data efficiently. It involves creating a unique dictionary of values for a column, where a compact identifier replaces each value. This approach significantly reduces the storage space required for repetitive or similar data. Compressed Representations: Data in segments is compressed to reduce its size in deep storage. Compression reduces the storage cost and speeds up data transfer between storage and processing nodes. Bitmap Indexes: Bitmap indexes are utilized for fast querying, especially for filtering and searching operations. They allow for efficient querying on high-cardinality columns by quickly identifying the rows that match the query criteria. Other Features of Druid: Apache Druid includes additional advanced features that enhance its performance and flexibility in data analytics. These features include: Multiple Levels of Caching Druid implements caching at various levels within its architecture, from the broker to the data nodes. This multi-tiered caching strategy includes: Broker Caching: Caches the results of queries at the broker level, which can significantly speed up response times for repeated queries. Historical Node Caching: Caches data segments in historical nodes, improving query performance on frequently accessed data. Query-Level Caching: Allows caching of partial query results, which can be reused in subsequent queries. Query Lanes and Prioritization Druid supports query planning and prioritization, which are essential for managing and optimizing query workloads. This feature allows administrators to categorize and prioritize queries based on their importance or urgency. For example, critical real-time queries can be prioritized over less urgent batch queries, ensuring that important tasks are completed first. Approximation and Vectorization: Approximation Algorithms: Druid can use various approximation algorithms (like HyperLogLog, Theta Sketches, etc.) to provide faster query responses, especially useful for aggregations and counts over large datasets. These algorithms trade a small amount of accuracy for significant gains in speed and resource efficiency. Vectorization refers to processing data in batches rather than one element at a time. Vectorized query execution allows Druid to perform operations on multiple data points simultaneously, significantly speeding up query performance, especially on modern hardware with SIMD (Single Instruction, Multiple Data) capabilities. Summary: The components and features discussed above make Druid a highly efficient and adaptable system for real-time analytics, capable of handling large volumes of data with varying query workloads while ensuring fast and resource-efficient data processing.
Miguel Garcia
VP of Engineering,
Nextail Labs
Gautam Goswami
Founder,
DataView