Enterprise AI Trend Report: Gain insights on ethical AI, MLOps, generative AI, large language models, and much more.
2024 Cloud survey: Share your insights on microservices, containers, K8s, CI/CD, and DevOps (+ enter a $750 raffle!) for our Trend Reports.
Open source refers to non-proprietary software that allows anyone to modify, enhance, or view the source code behind it. Our resources enable programmers to work or collaborate on projects created by different teams, companies, and organizations.
Scaling Java Microservices to Extreme Performance Using NCache
Hints for Unit Testing With AssertJ
Introduction to Datafaker Datafaker is a modern framework that enables JVM programmers to efficiently generate fake data for their projects using over 200 data providers allowing for quick setup and usage. Custom providers could be written when you need some domain-specific data. In addition to providers, the generated data can be exported to popular formats like CSV, JSON, SQL, XML, and YAML. For a good introduction to the basic features, see "Datafaker: An Alternative to Using Production Data." Datafaker offers many features, such as working with sequences and collections and generating custom objects based on schemas (see "Datafaker 2.0"). Bulk Data Generation In software development and testing, the need to frequently generate data for various purposes arises, whether it's to conduct non-functional tests or to simulate burst loads. Let's consider a straightforward scenario when we have the task of generating 10,000 messages in JSON format to be sent to RabbitMQ. From my perspective, these options are worth considering: Developing your own tool: One option is to write a custom application from scratch to generate these records(messages). If the generated data needs to be more realistic, it makes sense to use Datafaker or JavaFaker. Using specific tools: Alternatively, we could select specific tools designed for particular databases or message brokers. For example, tools like voluble for Kafka provide specialized functionalities for generating and publishing messages to Kafka topics; or a more modern tool like ShadowTraffic, which is currently under development and directed towards a container-based approach, which may not always be necessary. Datafaker Gen: Finally, we have the option to use Datafaker Gen, which I want to consider in the current article. Datafaker Gen Overview Datafaker Gen offers a command-line generator based on the Datafaker library which allows for the continuous generation of data in various formats and integration with different storage systems, message brokers, and backend services. Since this tool uses Datafaker, there may be a possibility that the data is realistic. Configuration of the scheme, format type, and sink can be done without rebuilding the project. Datafake Gen consists of the following main components that can be configured: 1. Schema Definition Users can define the schema for their records in the config.yaml file. The schema specifies the field definitions of the record based on the Datafaker provider. It also allows for the definition of embedded fields. YAML default_locale: en-EN fields: - name: lastname generators: [ Name#lastName ] - name: firstname generators: [ Name#firstName ] 2. Format Datafake Gen allows users to specify the format in which records will be generated. Currently, there are basic implementations for CSV, JSON, SQL, XML, and YAML formats. Additionally, formats can be extended with custom implementations. The configuration for formats is specified in the output.yaml file. YAML formats: csv: quote: "@" separator: $$$$$$$ json: formattedAs: "[]" yaml: xml: pretty: true 3. Sink The sink component determines where the generated data will be stored or published. The basic implementation includes command-line output and text file sinks. Additionally, sinks can be extended with custom implementations such as RabbitMQ, as demonstrated in the current article. The configuration for sinks is specified in the output.yaml file. YAML sinks: rabbitmq: batchsize: 1 # when 1 message contains 1 document, when >1 message contains a batch of documents host: localhost port: 5672 username: guest password: guest exchange: test.direct.exchange routingkey: products.key Extensibility via Java SPI Datafake Gen uses the Java SPI (Service Provider Interface) to make it easy to add new formats or sinks. This extensibility allows for customization of Datafake Gen according to specific requirements. How To Add a New Sink in Datafake Gen Before adding a new sink, you may want to check if it already exists in the datafaker-gen-examples repository. If it does not exist, you can refer to examples on how to add a new sink. When it comes to extending Datafake Gen with new sink implementations, developers have two primary options to consider: By using this parent project, developers can implement sink interfaces for their sink extensions, similar to those available in the datafaker-gen-examples repository. Include dependencies from the Maven repository to access the required interfaces. For this approach, Datafake Gen should be built and exist in the local Maven repository. This approach provides flexibility in project structure and requirements. 1. Implementing RabbitMQ Sink To add a new RabbitMQ sink, one simply needs to implement the net.datafaker.datafaker_gen.sink.Sink interface. This interface contains two methods: getName - This method defines the sink name. run - This method triggers the generation of records and then sends or saves all the generated records to the specified destination. The method parameters include the configuration specific to this sink retrieved from the output.yaml file as well as the data generation function and the desired number of lines to be generated. Java import net.datafaker.datafaker_gen.sink.Sink; public class RabbitMqSink implements Sink { @Override public String getName() { return "rabbitmq"; } @Override public void run(Map<String, ?> config, Function<Integer, ?> function, int numberOfLines) { // Read output configuration ... int numberOfLinesToPrint = numberOfLines; String host = (String) config.get("host"); // Generate lines String lines = (String) function.apply(numberOfLinesToPrint); // Sending or saving results to the expected resource // In this case, this is connecting to RebbitMQ and sending messages. ConnectionFactory factory = getConnectionFactory(host, port, username, password); try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); JsonArray jsonArray = JsonParser.parseString(lines).getAsJsonArray(); jsonArray.forEach(jsonElement -> { try { channel.basicPublish(exchange, routingKey, null, jsonElement.toString().getBytes()); } catch (Exception e) { throw new RuntimeException(e); } }); } catch (Exception e) { throw new RuntimeException(e); } } } 2. Adding Configuration for the New RabbitMQ Sink As previously mentioned, the configuration for sinks or formats can be added to the output.yaml file. The specific fields may vary depending on your custom sink. Below is an example configuration for a RabbitMQ sink: YAML sinks: rabbitmq: batchsize: 1 # when 1 message contains 1 document, when >1 message contains a batch of documents host: localhost port: 5672 username: guest password: guest exchange: test.direct.exchange routingkey: products.key 3. Adding Custom Sink via SPI Adding a custom sink via SPI (Service Provider Interface) involves including the provider configuration in the ./resources/META-INF/services/net.datafaker.datafaker_gen.sink.Sink file. This file contains paths to the sink implementations: Properties files net.datafaker.datafaker_gen.sink.RabbitMqSink These are all 3 simple steps on how to expand Datafake Gen. In this example, we are not providing a complete implementation of the sink, as well as how to use additional libraries. To see the complete implementations, you can refer to the datafaker-gen-rabbitmq module in the example repository. How To Run Step 1 Build a JAR file based on the new implementation: Shell ./mvnw clean verify Step 2 Define the schema for records in the config.yaml file and place this file in the appropriate location where the generator should run. Additionally, define the sinks and formats in the output.yaml file, as demonstrated previously. Step 3 Datafake Gen can be executed through two options: Use bash script from the bin folder in the parent project: Shell # Format json, number of lines 10000 and new RabbitMq Sink bin/datafaker_gen -f json -n 10000 -sink rabbitmq 2. Execute the JAR directly, like this: Shell java -cp [path_to_jar] net.datafaker.datafaker_gen.DatafakerGen -f json -n 10000 -sink rabbitmq How Fast Is It? The test was done based on the scheme described above, which means that one document consists of two fields. Documents are recorded one by one in the RabbitMQ queue in JSON format. The table below shows the speed for 10,000, 100,000, and 1M records on my local machine: Records Time 10000 401 ms 100000 11613ms 1000000 121601ms Conclusion The Datafake Gen tool enables the creation of flexible and fast data generators for various types of destinations. Built on Datafaker, it facilitates realistic data generation. Developers can easily configure the content of records, formats, and sinks to suit their needs. As a simple Java application, it can be deployed anywhere you want, whether it's in Docker or on-premise machines. The full source code is available here. I would like to thank Sergey Nuyanzin for reviewing this article. Thank you for reading, and I am glad to be of help.
NoSQL databases provide a flexible and scalable option for storing and retrieving data in database management. However, they can need help with object-oriented programming paradigms, such as inheritance, which is a fundamental concept in languages like Java. This article explores the impedance mismatch when dealing with inheritance in NoSQL databases. The Inheritance Challenge in NoSQL Databases The term “impedance mismatch” refers to the disconnect between the object-oriented world of programming languages like Java and NoSQL databases’ tabular, document-oriented, or graph-based structures. One area where this mismatch is particularly evident is in handling inheritance. In Java, inheritance allows you to create a hierarchy of classes, where a subclass inherits properties and behaviors from its parent class. This concept is deeply ingrained in Java programming and is often used to model real-world relationships. However, NoSQL databases have no joins, and the inheritance structure needs to be handled differently. Jakarta Persistence (JPA) and Inheritance Strategies Before diving into more advanced solutions, it’s worth mentioning that there are strategies to simulate inheritance in relational databases in the world of Jakarta Persistence (formerly known as JPA). These strategies include: JOINED inheritance strategy: In this approach, fields specific to a subclass are mapped to a separate table from the fields common to the parent class. A join operation is performed to instantiate the subclass when needed. SINGLE_TABLE inheritance strategy: This strategy uses a single table representing the entire class hierarchy. Discriminator columns are used to differentiate between different subclasses. TABLE_PER_CLASS inheritance strategy: Each concrete entity class in the hierarchy corresponds to its table in the database. These strategies work well in relational databases but are not directly applicable to NoSQL databases, primarily because NoSQL databases do not support traditional joins. Live Code Session: Java SE, Eclipse JNoSQL, and MongoDB In this live code session, we will create a Java SE project using MongoDB as our NoSQL database. We’ll focus on managing game characters, specifically Mario and Sonic characters, using Eclipse JNoSQL. You can run MongoDB locally using Docker or in the cloud with MongoDB Atlas. We’ll start with the database setup and then proceed to the Java code implementation. Setting Up MongoDB Locally To run MongoDB locally, you can use Docker with the following command: Shell docker run -d --name mongodb-instance -p 27017:27017 mongo Alternatively, you can choose to execute it in the cloud by following the instructions provided by MongoDB Atlas. With the MongoDB database up and running, let’s create our Java project. Creating the Java Project We’ll create a Java SE project using Maven and the maven-archetype-quickstart archetype. This project will utilize the following technologies and dependencies: Jakarta CDI Jakarta JSONP Eclipse MicroProfile Eclipse JNoSQL database Maven Dependencies Add the following dependencies to your project’s pom.xml file: XML <dependencies> <dependency> <groupId>org.jboss.weld.se</groupId> <artifactId>weld-se-shaded</artifactId> <version>${weld.se.core.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse</groupId> <artifactId>yasson</artifactId> <version>3.0.3</version> <scope>compile</scope> </dependency> <dependency> <groupId>io.smallrye.config</groupId> <artifactId>smallrye-config-core</artifactId> <version>3.2.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.microprofile.config</groupId> <artifactId>microprofile-config-api</artifactId> <version>3.0.2</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.eclipse.jnosql.databases</groupId> <artifactId>jnosql-mongodb</artifactId> <version>${jnosql.version}</version> </dependency> <dependency> <groupId>net.datafaker</groupId> <artifactId>datafaker</artifactId> <version>2.0.2</version> </dependency> </dependencies> Make sure to replace ${jnosql.version} with the appropriate version of Eclipse JNoSQL you intend to use. In the next section, we will proceed with implementing our Java code. Implementing Our Java Code Our GameCharacter class will serve as the parent class for all game characters and will hold the common attributes shared among them. We’ll use inheritance and discriminator columns to distinguish between Sonic’s and Mario’s characters. Here’s the initial definition of the GameCharacter class: Java @Entity @DiscriminatorColumn("type") @Inheritance public abstract class GameCharacter { @Id @Convert(UUIDConverter.class) protected UUID id; @Column protected String character; @Column protected String game; public abstract GameType getType(); } In this code: We annotate the class with @Entity to indicate that it is a persistent entity in our MongoDB database. We use @DiscriminatorColumn("type") to specify that a discriminator column named “type” will be used to differentiate between subclasses. @Inheritance indicates that this class is part of an inheritance hierarchy. The GameCharacter class has a unique identifier (id), attributes for character name (character) and game name (game), and an abstract method getType(), which its subclasses will implement to specify the character type. Specialization Classes: Sonic and Mario Now, let’s create the specialization classes for Sonic and Mario entities. These classes will extend the GameCharacter class and provide additional attributes specific to each character type. We’ll use @DiscriminatorValue to define the values the “type” discriminator column can take for each subclass. Java @Entity @DiscriminatorValue("SONIC") public class Sonic extends GameCharacter { @Column private String zone; @Override public GameType getType() { return GameType.SONIC; } } In the Sonic class: We annotate it with @Entity to indicate it’s a persistent entity. @DiscriminatorValue("SONIC") specifies that the “type” discriminator column will have the value “SONIC” for Sonic entities. We add an attribute zone-specific to Sonic characters. The getType() method returns GameType.SONIC, indicating that this is a Sonic character. Java @Entity @DiscriminatorValue("MARIO") public class Mario extends GameCharacter { @Column private String locations; @Override public GameType getType() { return GameType.MARIO; } } Similarly, in the Mario class: We annotate it with @Entity to indicate it’s a persistent entity. @DiscriminatorValue("MARIO") specifies that the “type” discriminator column will have the value “MARIO” for Mario entities. We add an attribute locations specific to Mario characters. The getType() method returns GameType.MARIO, indicating that this is a Mario character. With this modeling approach, you can easily distinguish between Sonic and Mario characters in your MongoDB database using the discriminator column “type.” We will create our first database integration with MongoDB using Eclipse JNoSQL. To simplify, we will generate data using the Data Faker library. Our Java application will insert Mario and Sonic characters into the database and perform basic operations. Application Code Here’s the main application code that generates and inserts data into the MongoDB database: Java public class App { public static void main(String[] args) { try (SeContainer container = SeContainerInitializer.newInstance().initialize()) { DocumentTemplate template = container.select(DocumentTemplate.class).get(); DataFaker faker = new DataFaker(); Mario mario = Mario.of(faker.generateMarioData()); Sonic sonic = Sonic.of(faker.generateSonicData()); // Insert Mario and Sonic characters into the database template.insert(List.of(mario, sonic)); // Count the total number of GameCharacter documents long count = template.count(GameCharacter.class); System.out.println("Total of GameCharacter: " + count); // Find all Mario characters in the database List<Mario> marioCharacters = template.select(Mario.class).getResultList(); System.out.println("Find all Mario characters: " + marioCharacters); // Find all Sonic characters in the database List<Sonic> sonicCharacters = template.select(Sonic.class).getResultList(); System.out.println("Find all Sonic characters: " + sonicCharacters); } } } In this code: We use the SeContainer to manage our CDI container and initialize the DocumentTemplate from Eclipse JNoSQL. We create instances of Mario and Sonic characters using data generated by the DataFaker class. We insert these characters into the MongoDB database using the template.insert() method. We count the total number of GameCharacter documents in the database. We retrieve and display all Mario and Sonic characters from the database. Resulting Database Structure As a result of running this code, you will see data in your MongoDB database similar to the following structure: JSON [ { "_id": "39b8901c-669c-49db-ac42-c1cabdcbb6ed", "character": "Bowser", "game": "Super Mario Bros.", "locations": "Mount Volbono", "type": "MARIO" }, { "_id": "f60e1ada-bfd9-4da7-8228-6a7f870e3dc8", "character": "Perfect Chaos", "game": "Sonic Rivals 2", "type": "SONIC", "zone": "Emerald Hill Zone" } ] As shown in the database structure, each document contains a unique identifier (_id), character name (character), game name (game), and a discriminator column type to differentiate between Mario and Sonic characters. You will see more characters in your MongoDB database depending on your generated data. This integration demonstrates how to insert, count, and retrieve game characters using Eclipse JNoSQL and MongoDB. You can extend and enhance this application to manage and manipulate your game character data as needed. We will create repositories for managing game characters using Eclipse JNoSQL. We will have a Console repository for general game characters and a SonicRepository specifically for Sonic characters. These repositories will allow us to interact with the database and perform various operations easily. Let’s define the repositories for our game characters. Console Repository Java @Repository public interface Console extends PageableRepository<GameCharacter, UUID> { } The Console repository extends PageableRepository and is used for general game characters. It provides common CRUD operations and pagination support. Sonic Repository Java @Repository public interface SonicRepository extends PageableRepository<Sonic, UUID> { } The SonicRepository extends PageableRepository but is specifically designed for Sonic characters. It inherits common CRUD operations and pagination from the parent repository. Main Application Code Now, let’s modify our main application code to use these repositories. For Console Repository Java public static void main(String[] args) { Faker faker = new Faker(); try (SeContainer container = SeContainerInitializer.newInstance().initialize()) { Console repository = container.select(Console.class).get(); for (int index = 0; index < 5; index++) { Mario mario = Mario.of(faker); Sonic sonic = Sonic.of(faker); repository.saveAll(List.of(mario, sonic)); } long count = repository.count(); System.out.println("Total of GameCharacter: " + count); System.out.println("Find all game characters: " + repository.findAll().toList()); } System.exit(0); } In this code, we use the Console repository to save both Mario and Sonic characters, demonstrating its ability to manage general game characters. For Sonic Repository Java public static void main(String[] args) { Faker faker = new Faker(); try (SeContainer container = SeContainerInitializer.newInstance().initialize()) { SonicRepository repository = container.select(SonicRepository.class).get(); for (int index = 0; index < 5; index++) { Sonic sonic = Sonic.of(faker); repository.save(sonic); } long count = repository.count(); System.out.println("Total of Sonic characters: " + count); System.out.println("Find all Sonic characters: " + repository.findAll().toList()); } System.exit(0); } This code uses the SonicRepository to save Sonic characters specifically. It showcases how to work with a repository dedicated to a particular character type. With these repositories, you can easily manage, query, and filter game characters based on their type, simplifying the code and making it more organized. Conclusion In this article, we explored the seamless integration of MongoDB with Java using the Eclipse JNoSQL framework for efficient game character management. We delved into the intricacies of modeling game characters, addressing challenges related to inheritance in NoSQL databases while maintaining compatibility with Java's object-oriented principles. By employing discriminator columns, we could categorize characters and store them within the MongoDB database, creating a well-structured and extensible solution. Through our Java application, we demonstrated how to generate sample game character data using the Data Faker library and efficiently insert it into MongoDB. We performed essential operations, such as counting the number of game characters and retrieving specific character types. Moreover, we introduced the concept of repositories in Eclipse JNoSQL, showcasing their value in simplifying data management and enabling focused queries based on character types. This article provides a solid foundation for harnessing the power of Eclipse JNoSQL and MongoDB to streamline NoSQL database interactions in Java applications, making it easier to manage and manipulate diverse data sets. Source code
Imagine you have an AI-powered personal alerting chat assistant that interacts using up-to-date data. Whether it’s a big move in the stock market that affects your investments, any significant change on your shared SharePoint documents, or discounts on Amazon you were waiting for, the application is designed to keep you informed and alert you about any significant changes based on the criteria you set in advance using your natural language. In this post, we will learn how to build a full-stack event-driven weather alert chat application in Python using pretty cool tools: Streamlit, NATS, and OpenAI. The app can collect real-time weather information, understand your criteria for alerts using AI, and deliver these alerts to the user interface. This piece of content and code samples can be incredibly helpful for those who love technology or those who are developers to understand how modern real-time alerting systems work with Larger Language Models (LLMs) and how to implement one. You can also quickly jump on the source code hosted on our GitHub and try it yourself. The Power Behind the Scenes Let’s take a closer look at how the AI weather alert chat application works and transforms raw data into actionable alerts, keeping you one step ahead of the weather. At the core of our application lies a responsive backend implemented in Python, powered by NATS to ensure real-time data processing and message management. Integrating OpenAI’s GPT model brings a conversational AI to life, capable of understanding alerts’ nature and responding to user queries. Users can specify their alert criteria in natural language, then the GPT model will interpret them. Image 1: Real-time alert app architecture Real-Time Data Collection The journey begins with the continuous asynchronous collection of weather data from various sources in the backend. Our application now uses the api.weatherapi.com service, fetching real-time weather information every 10 seconds. This data includes temperature, humidity, precipitation, and more, covering locations worldwide. This snippet asynchronously fetches current weather data for Estonia but the app can be improved to set the location from user input dynamically: async def fetch_weather_data(): api_url = f"http://api.weatherapi.com/v1/current.json?key={weather_api_key}&q=estonia" try: async with aiohttp.ClientSession() as session: async with session.get(api_url) as response: if response.status == 200: return await response.json() else: logging.error(f"Error fetching weather data: HTTP {response.status}") return None except Exception as e: logging.error(f"Error fetching weather data: {e}") return None The Role of NATS in Data Streaming The code segment in the main() function in the backend.py file demonstrates the integration of NATS for even-driven messaging, continuous weather monitoring, and alerting. We use the nats.py library to integrate NATS within Python code. First, we establish a connection to the NATs server running in Docker at nats://localhost:4222. nats_client = await nats.connect("nats://localhost:4222") Then, we define an asynchronous message_handler function that subscribes and processes messages received on the chat subject from the NATs server. If a message starts with "Set Alert:" (we append it on the frontend side), it extracts and updates the user's alert criteria. async def message_handler(msg): nonlocal user_alert_criteria data = msg.data.decode() if data.startswith("Set Alert:"): user_alert_criteria = data[len("Set Alert:"):].strip() logging.info(f"User alert criteria updated: {user_alert_criteria}") await nats_client.subscribe("chat", cb=message_handler) The backend service integrates with both external services like Weather API and Open AI Chat Completion API. If both weather data and user alert criteria are present, the app constructs a prompt for OpenAI’s GPT model to determine if the weather meets the user’s criteria. The prompt asks the AI to analyze the current weather against the user’s criteria and respond with “YES” or “NO” and a brief weather summary. Once the AI determines that the incoming weather data matches a user’s alert criteria, it crafts a personalized alert message and publishes a weather alert to the chat_response subject on the NATS server to update the frontend app with the latest changes. This message contains user-friendly notifications designed to inform and advise the user. For example, it might say, "Heads up! Rain is expected in Estonia tomorrow. Don't forget to bring an umbrella!" while True: current_weather = await fetch_weather_data() if current_weather and user_alert_criteria: logging.info(f"Current weather data: {current_weather}") prompt = f"Use the current weather: {current_weather} information and user alert criteria: {user_alert_criteria}. Identify if the weather meets these criteria and return only YES or NO with a short weather temperature info without explaining why." response_text = await get_openai_response(prompt) if response_text and "YES" in response_text: logging.info("Weather conditions met user criteria.") ai_response = f"Weather alert! Your specified conditions have been met. {response_text}" await nats_client.publish("chat_response", payload=ai_response.encode()) else: logging.info("Weather conditions did not meet user criteria.") else: logging.info("No current weather data or user alert criteria set.")await asyncio.sleep(10) Delivering and Receiving Alerts in Real-Time Let’s understand the overall communication flow between the backend and frontend. Through a simple chat interface built using Streamlit (see frontend.py file), the user inputs their weather alert criteria using natural language and submits it. alert_criteria = st.text_input("Set your weather alert criteria", key="alert_criteria", disabled=st.session_state['alert_set']) Below, Streamlit frontend code interacts with a backend service via NATS messaging. It publishes these criteria to the NATS server on the chat subject. def send_message_to_nats_handler(message): with NATSClient() as client: client.connect() client.publish("chat", payload=message.encode()) client.subscribe("chat_response", callback=read_message_from_nats_handler) client.wait() if set_alert_btn: st.session_state['alert_set'] = True st.success('Alert criteria set') send_message_to_nats_handler(f"Set Alert: {alert_criteria}") As we have seen in the previous section, the backend listens to the chat subject, receives the criteria, fetches current weather data, and uses AI to determine if an alert should be triggered. If conditions are met, the backend sends an alert message to the chat_response subject. The front end receives this message and updates the UI to notify the user. def read_message_from_nats_handler(msg): message = msg.payload.decode() st.session_state['conversation'].append(("AI", message)) st.markdown(f"<span style='color: red;'></span> AI: {message}", unsafe_allow_html=True) Try It Out To explore the real-time weather alert chat application in detail and try it out for yourself, please visit our GitHub repository linked earlier. The repository contains all the necessary code, detailed setup instructions, and additional documentation to help you get started. Once the setup is complete, you can start the Streamlit frontend and the Python backend. Set your weather alert criteria, and see how the system processes real-time weather data to keep you informed. Image 2: Streamlit UI for the alert app Building Stream Processing Pipelines Real-time weather alert chat application demonstrated a powerful use case of NATS for real-time messaging in a distributed system, allowing for efficient communication between a user-facing frontend and a data-processing backend. However, you should consider several key steps to ensure that the information presented to the user is relevant, accurate, and actionable. In the app, we are just fetching live raw weather data and sending it straightaway to OpenAI or the front end. Sometimes you need to transform this data to filter, enrich, aggregate, or normalize it in real time before it reaches the external services. You start to think about creating a stream processing pipeline with several stages. For example, not all the data fetched from the API will be relevant to every user and you can filter out unnecessary information at an initial stage. Also, data can come in various formats, especially if you’re sourcing information from multiple APIs for comprehensive alerting and you need to normalize this data. At the next stage, you enrich the data with extra context or information to the raw data to make it more useful. This could include comparing current weather conditions against historical data to identify unusual patterns or adding location-based insights using another external API, such as specific advice for weather conditions in a particular area. At later stages, you might aggregate hourly temperature data to give an average daytime temperature or to highlight the peak temperature reached during the day. Next Steps When it comes to transforming data, deploying, running, and scaling the app in a production environment, you might want to use dedicated frameworks in Python like GlassFlow to build sophisticated stream-processing pipelines. GlassFlow offers a fully managed serverless infrastructure for stream processing, you don’t have to think about setup, or maintenance where the app can handle large volumes of data and user requests with ease. It provides advanced state management capabilities, making it easier to track user alert criteria and other application states. Your application can scale with its user base without compromising performance. Recommended Content Microservices Data Synchronization Using PostgreSQL, Debezium, and NATS Training Fraud Detection ML Models with Real-time Data Streams
In a previous blog, the influence of the document format and the way it is embedded in combination with semantic search was discussed. LangChain4j was used to accomplish this. The way the document was embedded has a major influence on the results. This was one of the main conclusions. However, a perfect result was not achieved. In this post, you will take a look at Weaviate, a vector database that has a Java client library available. You will investigate whether better results can be achieved. The source documents are two Wikipedia documents. You will use the discography and list of songs recorded by Bruce Springsteen. The interesting part of these documents is that they contain facts and are mainly in a table format. Parts of these documents are converted to Markdown in order to have a better representation. The same documents were used in the previous blog, so it will be interesting to see how the findings from that post compare to the approach used in this post. The sources used in this blog can be found on GitHub. Prerequisites The prerequisites for this blog are: Basic knowledge of embedding and vector stores Basic Java knowledge, Java 21 is used Basic knowledge of Docker The Weaviate starter guides are also interesting reading material. How to Implement Vector Similarity Search 1. Installing Weaviate There are several ways to install Weaviate. An easy installation is through Docker Compose. Just use the sample Docker Compose file. YAML version: '3.4' services: weaviate: command: - --host - 0.0.0.0 - --port - '8080' - --scheme - http image: semitechnologies/weaviate:1.23.2 ports: - 8080:8080 - 50051:50051 volumes: - weaviate_data:/var/lib/weaviate restart: on-failure:0 environment: QUERY_DEFAULTS_LIMIT: 25 AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' PERSISTENCE_DATA_PATH: '/var/lib/weaviate' DEFAULT_VECTORIZER_MODULE: 'none' ENABLE_MODULES: 'text2vec-cohere,text2vec-huggingface,text2vec-palm,text2vec-openai,generative-openai,generative-cohere,generative-palm,ref2vec-centroid,reranker-cohere,qna-openai' CLUSTER_HOSTNAME: 'node1' volumes: weaviate_data: Start the Compose file from the root of the repository. Shell $ docker compose -f docker/compose-initial.yaml up You can shut it down with CTRL+C or with the following command: Shell $ docker compose -f docker/compose-initial.yaml down 2. Connect to Weaviate First, let’s try to connect to Weaviate through the Java library. Add the following dependency to the pom file: XML <dependency> <groupId>io.weaviate</groupId> <artifactId>client</artifactId> <version>4.5.1</version> </dependency> The following code will create a connection to Weaviate and display some metadata information about the instance. Java Config config = new Config("http", "localhost:8080"); WeaviateClient client = new WeaviateClient(config); Result<Meta> meta = client.misc().metaGetter().run(); if (meta.getError() == null) { System.out.printf("meta.hostname: %s\n", meta.getResult().getHostname()); System.out.printf("meta.version: %s\n", meta.getResult().getVersion()); System.out.printf("meta.modules: %s\n", meta.getResult().getModules()); } else { System.out.printf("Error: %s\n", meta.getError().getMessages()); } The output is the following: Shell meta.hostname: http://[::]:8080 meta.version: 1.23.2 meta.modules: {generative-cohere={documentationHref=https://docs.cohere.com/reference/generate, name=Generative Search - Cohere}, generative-openai={documentationHref=https://platform.openai.com/docs/api-reference/completions, name=Generative Search - OpenAI}, generative-palm={documentationHref=https://cloud.google.com/vertex-ai/docs/generative-ai/chat/test-chat-prompts, name=Generative Search - Google PaLM}, qna-openai={documentationHref=https://platform.openai.com/docs/api-reference/completions, name=OpenAI Question & Answering Module}, ref2vec-centroid={}, reranker-cohere={documentationHref=https://txt.cohere.com/rerank/, name=Reranker - Cohere}, text2vec-cohere={documentationHref=https://docs.cohere.ai/embedding-wiki/, name=Cohere Module}, text2vec-huggingface={documentationHref=https://huggingface.co/docs/api-inference/detailed_parameters#feature-extraction-task, name=Hugging Face Module}, text2vec-openai={documentationHref=https://platform.openai.com/docs/guides/embeddings/what-are-embeddings, name=OpenAI Module}, text2vec-palm={documentationHref=https://cloud.google.com/vertex-ai/docs/generative-ai/embeddings/get-text-embeddings, name=Google PaLM Module} The version is shown and the modules that were activated, this corresponds to the modules activated in the docker compose file. 3. Embed Documents In order to query the documents, the documents need to be embedded first. This can be done by means of the text2vec-transformers module. Create a new Docker Compose file with only the text2vec-transformers module enabled. You also set this module as DEFAULT_VECTORIZER_MODULE, set the TRANSFORMERS_INFERENCE_API to the transformer container and you use the sentence-transformers-all-MiniLM-L6-v2-onnx image for the transformer container. You use the ONNX image when you do not make use of a GPU. YAML version: '3.4' services: weaviate: command: - --host - 0.0.0.0 - --port - '8080' - --scheme - http image: semitechnologies/weaviate:1.23.2 ports: - 8080:8080 - 50051:50051 volumes: - weaviate_data:/var/lib/weaviate restart: on-failure:0 environment: QUERY_DEFAULTS_LIMIT: 25 AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' PERSISTENCE_DATA_PATH: '/var/lib/weaviate' DEFAULT_VECTORIZER_MODULE: 'text2vec-transformers' ENABLE_MODULES: 'text2vec-transformers' TRANSFORMERS_INFERENCE_API: http://t2v-transformers:8080 CLUSTER_HOSTNAME: 'node1' t2v-transformers: image: semitechnologies/transformers-inference:sentence-transformers-all-MiniLM-L6-v2-onnx volumes: weaviate_data: Start the containers: Shell $ docker compose -f docker/compose-embed.yaml up Embedding the data is an important step that needs to be executed thoroughly. It is therefore important to know the Weaviate concepts. Every data object belongs to a Class, and a class has one or more Properties. A Class can be seen as a collection and every data object (represented as JSON-documents) can be represented by a vector (i.e. an embedding). Every Class contains objects which belong to this class, which corresponds to a common schema. Three markdown files with data of Bruce Springsteen are available. The embedding will be done as follows: Every markdown file will be converted to a Weaviate Class. A markdown file consists out of a header. The header contains the column names, which will be converted into Weaviate Properties. Properties need to be valid GraphQL names. Therefore, the column names have been altered a bit compared to the previous blog. E.g. writer(s) has become writers, album details has become AlbumDetails, etc. After the header, the data is present. Ever row in the table will be converted to a data object belonging to a Class. An example of a markdown file is the Compilation Albums file. Markdown | Title | US | AUS | CAN | GER | IRE | NLD | NZ | NOR | SWE | UK | |----------------------------------|----|-----|-----|-----|-----|-----|----|-----|-----|----| | Greatest Hits | 1 | 1 | 1 | 1 | 1 | 2 | 1 | 1 | 1 | 1 | | Tracks | 27 | 97 | — | 63 | — | 36 | — | 4 | 11 | 50 | | 18 Tracks | 64 | 98 | 58 | 8 | 20 | 69 | — | 2 | 1 | 23 | | The Essential Bruce Springsteen | 14 | 41 | — | — | 5 | 22 | — | 4 | 2 | 15 | | Greatest Hits | 43 | 17 | 21 | 25 | 2 | 4 | 3 | 3 | 1 | 3 | | The Promise | 16 | 22 | 27 | 1 | 4 | 4 | 30 | 1 | 1 | 7 | | Collection: 1973–2012 | — | 6 | — | 23 | 2 | 78 | 19 | 1 | 6 | — | | Chapter and Verse | 5 | 2 | 21 | 4 | 2 | 5 | 4 | 3 | 2 | 2 | In the next sections, the steps taken to embed the documents are explained in more detail. The complete source code is available at GitHub. This is not the most clean code, but I do hope it is understandable. 3.1 Basic Setup A map is created, which contains the file names linked to the Weaviate Class names to be used. Java private static Map<String, String> documentNames = Map.of( "bruce_springsteen_list_of_songs_recorded.md", "Songs", "bruce_springsteen_discography_compilation_albums.md", "CompilationAlbums", "bruce_springsteen_discography_studio_albums.md", "StudioAlbums"); In the basic setup, a connection is set up to Weaviate, all data is removed from the database, and the files are read. Every file is then processed one by one. Java Config config = new Config("http", "localhost:8080"); WeaviateClient client = new WeaviateClient(config); // Remove existing data Result<Boolean> deleteResult = client.schema().allDeleter().run(); if (deleteResult.hasErrors()) { System.out.println(new GsonBuilder().setPrettyPrinting().create().toJson(deleteResult.getResult())); } List<Document> documents = loadDocuments(toPath("markdown-files")); for (Document document : documents) { ... } 3.2 Convert Header to Class The header information needs to be converted to a Weaviate Class. Split the complete file row by row. The first line contains the header, split it by means of the | separator and store it in variable tempSplittedHeader. The header starts with a | and therefore the first entry in tempSplittedHeader is empty. Remove it and store the remaining part of the row in variable splittedHeader. For every item in splittedHeader (i.e. the column names), a Weaviate Property is created. Strip all leading and trailing spaces from the data. Create the Weaviate documentClass with the class name as defined in the documentNames map and the just created Properties. Add the class to the schema and verify the result. Java // Split the document line by line String[] splittedDocument = document.text().split("\n"); // split the header on | and remove the first item (the line starts with | and the first item is therefore empty) String[] tempSplittedHeader = splittedDocument[0].split("\\|"); String[] splittedHeader = Arrays.copyOfRange(tempSplittedHeader,1, tempSplittedHeader.length); // Create the Weaviate collection, every item in the header is a Property ArrayList<Property> properties = new ArrayList<>(); for (String splittedHeaderItem : splittedHeader) { Property property = Property.builder().name(splittedHeaderItem.strip()).build(); properties.add(property); } WeaviateClass documentClass = WeaviateClass.builder() .className(documentNames.get(document.metadata("file_name"))) .properties(properties) .build(); // Add the class to the schema Result<Boolean> collectionResult = client.schema().classCreator() .withClass(documentClass) .run(); if (collectionResult.hasErrors()) { System.out.println("Creation of collection failed: " + documentNames.get(document.metadata("file_name"))); } 3.3 Convert Data Rows to Objects Every data row needs to be converted to a Weaviate data object. Copy the rows containing data in variable dataOnly. Loop over every row, a row is represented by variable documentLine. Split every line by means of the | separator and store it in variable tempSplittedDocumentLine. Just like the header, every row starts with a |, and therefore, the first entry in tempSplittedDocumentLine is empty. Remove it and store the remaining part of the row in variable splittedDocumentLine. Every item in the row becomes a property. The complete row is converted to properties in variable propertiesDocumentLine. Strip all leading and trailing spaces from the data. Add the data object to the Class and verify the result. At the end, print the result. Java // Preserve only the rows containing data, the first two rows contain the header String[] dataOnly = Arrays.copyOfRange(splittedDocument, 2, splittedDocument.length); for (String documentLine : dataOnly) { // split a data row on | and remove the first item (the line starts with | and the first item is therefore empty) String[] tempSplittedDocumentLine = documentLine.split("\\|"); String[] splittedDocumentLine = Arrays.copyOfRange(tempSplittedDocumentLine, 1, tempSplittedDocumentLine.length); // Every item becomes a property HashMap<String, Object> propertiesDocumentLine = new HashMap<>(); int i = 0; for (Property property : properties) { propertiesDocumentLine.put(property.getName(), splittedDocumentLine[i].strip()); i++; } Result<WeaviateObject> objectResult = client.data().creator() .withClassName(documentNames.get(document.metadata("file_name"))) .withProperties(propertiesDocumentLine) .run(); if (objectResult.hasErrors()) { System.out.println("Creation of object failed: " + propertiesDocumentLine); } String json = new GsonBuilder().setPrettyPrinting().create().toJson(objectResult.getResult()); System.out.println(json); } 3.4 The Result Running the code to embed the documents prints what is stored in the Weaviate vector database. As you can see below, a data object has a UUID, the class is StudioAlbums, the properties are listed and the corresponding vector is displayed. Shell { "id": "e0d5e1a3-61ad-401d-a264-f95a9a901d82", "class": "StudioAlbums", "creationTimeUnix": 1705842658470, "lastUpdateTimeUnix": 1705842658470, "properties": { "aUS": "3", "cAN": "8", "gER": "1", "iRE": "2", "nLD": "1", "nOR": "1", "nZ": "4", "sWE": "1", "title": "Only the Strong Survive", "uK": "2", "uS": "8" }, "vector": [ -0.033715352, -0.07489116, -0.015459526, -0.025204511, ... 0.03576842, -0.010400549, -0.075309984, -0.046005197, 0.09666792, 0.0051724687, -0.015554721, 0.041699238, -0.09749843, 0.052182134, -0.0023900834 ] } 4. Manage Collections So, now you have data in the vector database. What kind of information can be retrieved from the database? You are able to manage the collection, for example. 4.1 Retrieve Collection Definition The definition of a collection can be retrieved as follows: Java String className = "CompilationAlbums"; Result<WeaviateClass> result = client.schema().classGetter() .withClassName(className) .run(); String json = new GsonBuilder().setPrettyPrinting().create().toJson(result.getResult()); System.out.println(json); The output is the following: Shell { "class": "CompilationAlbums", "description": "This property was generated by Weaviate\u0027s auto-schema feature on Sun Jan 21 13:10:58 2024", "invertedIndexConfig": { "bm25": { "k1": 1.2, "b": 0.75 }, "stopwords": { "preset": "en" }, "cleanupIntervalSeconds": 60 }, "moduleConfig": { "text2vec-transformers": { "poolingStrategy": "masked_mean", "vectorizeClassName": true } }, "properties": [ { "name": "uS", "dataType": [ "text" ], "description": "This property was generated by Weaviate\u0027s auto-schema feature on Sun Jan 21 13:10:58 2024", "tokenization": "word", "indexFilterable": true, "indexSearchable": true, "moduleConfig": { "text2vec-transformers": { "skip": false, "vectorizePropertyName": false } } }, ... } You can see how it was vectorized, the properties, etc. 4.2 Retrieve Collection Objects Can you also retrieve the collection objects? Yes, you can, but this is not possible at the moment of writing with the java client library. You will notice, when browsing the Weaviate documentation, that there is no example code for the java client library. However, you can make use of the GraphQL API which can also be called from java code. The code to retrieve the title property of every data object in the CompilationAlbums Class is the following: You call the graphQL method from the Weaviate client. You define the Weaviate Class and the fields you want to retrieve. You print the result. Java Field song = Field.builder().name("title").build(); Result<GraphQLResponse> result = client.graphQL().get() .withClassName("CompilationAlbums") .withFields(song) .run(); if (result.hasErrors()) { System.out.println(result.getError()); return; } System.out.println(result.getResult()); The result shows you all the titles: Shell GraphQLResponse( data={ Get={ CompilationAlbums=[ {title=Chapter and Verse}, {title=The Promise}, {title=Greatest Hits}, {title=Tracks}, {title=18 Tracks}, {title=The Essential Bruce Springsteen}, {title=Collection: 1973–2012}, {title=Greatest Hits} ] } }, errors=null) 5. Semantic Search The whole purpose of embedding the documents is to verify whether you can search the documents. In order to search, you also need to make use of the GraphQL API. Different search operators are available. Just like in the previous blog, 5 questions are asked about the data. on which album was “adam raised a cain” originally released?The answer is “Darkness on the Edge of Town”. what is the highest chart position of “Greetings from Asbury Park, N.J.” in the US?This answer is #60. what is the highest chart position of the album “tracks” in canada?The album did not have a chart position in Canada. in which year was “Highway Patrolman” released?The answer is 1982. who produced “all or nothin’ at all”?The answer is Jon Landau, Chuck Plotkin, Bruce Springsteen and Roy Bittan. In the source code, you provide the class name and the corresponding fields. This information is added in a static class for each collection. The code contains the following: Create a connection to Weaviate. Add the fields of the class and also add two additional fields, the certainty and the distance. Embed the question using a NearTextArgument. Search the collection via the GraphQL API, limit the result to 1. Print the result. Java private static void askQuestion(String className, Field[] fields, String question) { Config config = new Config("http", "localhost:8080"); WeaviateClient client = new WeaviateClient(config); Field additional = Field.builder() .name("_additional") .fields(Field.builder().name("certainty").build(), // only supported if distance==cosine Field.builder().name("distance").build() // always supported ).build(); Field[] allFields = Arrays.copyOf(fields, fields.length + 1); allFields[fields.length] = additional; // Embed the question NearTextArgument nearText = NearTextArgument.builder() .concepts(new String[]{question}) .build(); Result<GraphQLResponse> result = client.graphQL().get() .withClassName(className) .withFields(allFields) .withNearText(nearText) .withLimit(1) .run(); if (result.hasErrors()) { System.out.println(result.getError()); return; } System.out.println(result.getResult()); } Invoke this method for the five questions. Java askQuestion(Song.NAME, Song.getFields(), "on which album was \"adam raised a cain\" originally released?"); askQuestion(StudioAlbum.NAME, StudioAlbum.getFields(), "what is the highest chart position of \"Greetings from Asbury Park, N.J.\" in the US?"); askQuestion(CompilationAlbum.NAME, CompilationAlbum.getFields(), "what is the highest chart position of the album \"tracks\" in canada?"); askQuestion(Song.NAME, Song.getFields(), "in which year was \"Highway Patrolman\" released?"); askQuestion(Song.NAME, Song.getFields(), "who produced \"all or nothin' at all?\""); The result is amazing, for all five questions the correct data object is returned. Shell GraphQLResponse( data={ Get={ Songs=[ {_additional={certainty=0.7534831166267395, distance=0.49303377}, originalRelease=Darkness on the Edge of Town, producers=Jon Landau Bruce Springsteen Steven Van Zandt (assistant), song="Adam Raised a Cain", writers=Bruce Springsteen, year=1978} ] } }, errors=null) GraphQLResponse( data={ Get={ StudioAlbums=[ {_additional={certainty=0.803815484046936, distance=0.39236903}, aUS=71, cAN=—, gER=—, iRE=—, nLD=—, nOR=—, nZ=—, sWE=35, title=Greetings from Asbury Park,N.J., uK=41, uS=60} ] } }, errors=null) GraphQLResponse( data={ Get={ CompilationAlbums=[ {_additional={certainty=0.7434340119361877, distance=0.513132}, aUS=97, cAN=—, gER=63, iRE=—, nLD=36, nOR=4, nZ=—, sWE=11, title=Tracks, uK=50, uS=27} ] } }, errors=null) GraphQLResponse( data={ Get={ Songs=[ {_additional={certainty=0.743279218673706, distance=0.51344156}, originalRelease=Nebraska, producers=Bruce Springsteen, song="Highway Patrolman", writers=Bruce Springsteen, year=1982} ] } }, errors=null) GraphQLResponse( data={ Get={ Songs=[ {_additional={certainty=0.7136414051055908, distance=0.5727172}, originalRelease=Human Touch, producers=Jon Landau Chuck Plotkin Bruce Springsteen Roy Bittan, song="All or Nothin' at All", writers=Bruce Springsteen, year=1992} ] } }, errors=null) 6. Explore Collections The semantic search implementation assumed that you knew in which collection to search the answer. Most of the time, you do not know which collection to search for. The explore function can help in order to search across multiple collections. There are some limitations to the use of the explore function: Only one vectorizer module may be enabled. The vector search must be nearText or nearVector. The askQuestion method becomes the following. Just like in the previous paragraph, you want to return some additional, more generic fields of the collection. The question is embedded in a NearTextArgument and the collections are explored. Java private static void askQuestion(String question) { Config config = new Config("http", "localhost:8080"); WeaviateClient client = new WeaviateClient(config); ExploreFields[] fields = new ExploreFields[]{ ExploreFields.CERTAINTY, // only supported if distance==cosine ExploreFields.DISTANCE, // always supported ExploreFields.BEACON, ExploreFields.CLASS_NAME }; NearTextArgument nearText = NearTextArgument.builder().concepts(new String[]{question}).build(); Result<GraphQLResponse> result = client.graphQL().explore() .withFields(fields) .withNearText(nearText) .run(); if (result.hasErrors()) { System.out.println(result.getError()); return; } System.out.println(result.getResult()); } Running this code returns an error. A bug is reported, because a vague error is returned. Shell GraphQLResponse(data={Explore=null}, errors=[GraphQLError(message=runtime error: invalid memory address or nil pointer dereference, path=[Explore], locations=[GraphQLErrorLocationsItems(column=2, line=1)])]) GraphQLResponse(data={Explore=null}, errors=[GraphQLError(message=runtime error: invalid memory address or nil pointer dereference, path=[Explore], locations=[GraphQLErrorLocationsItems(column=2, line=1)])]) GraphQLResponse(data={Explore=null}, errors=[GraphQLError(message=runtime error: invalid memory address or nil pointer dereference, path=[Explore], locations=[GraphQLErrorLocationsItems(column=2, line=1)])]) GraphQLResponse(data={Explore=null}, errors=[GraphQLError(message=runtime error: invalid memory address or nil pointer dereference, path=[Explore], locations=[GraphQLErrorLocationsItems(column=2, line=1)])]) GraphQLResponse(data={Explore=null}, errors=[GraphQLError(message=runtime error: invalid memory address or nil pointer dereference, path=[Explore], locations=[GraphQLErrorLocationsItems(column=2, line=1)])]) However, in order to circumvent this error, it would be interesting to verify whether the correct answer returns the highest certainty over all collections. Therefore, for each question every collection is queried. The complete code can be found here, below only the code for question 1 is shown. The askQuestion implementation is the one used in the Semantic Search paragraph. Java private static void question1() { askQuestion(Song.NAME, Song.getFields(), "on which album was \"adam raised a cain\" originally released?"); askQuestion(StudioAlbum.NAME, StudioAlbum.getFields(), "on which album was \"adam raised a cain\" originally released?"); askQuestion(CompilationAlbum.NAME, CompilationAlbum.getFields(), "on which album was \"adam raised a cain\" originally released?"); } Running this code returns the following output. Shell GraphQLResponse(data={Get={Songs=[{_additional={certainty=0.7534831166267395, distance=0.49303377}, originalRelease=Darkness on the Edge of Town, producers=Jon Landau Bruce Springsteen Steven Van Zandt (assistant), song="Adam Raised a Cain", writers=Bruce Springsteen, year=1978}]}, errors=null) GraphQLResponse(data={Get={StudioAlbums=[{_additional={certainty=0.657206118106842, distance=0.68558776}, aUS=9, cAN=7, gER=—, iRE=73, nLD=4, nOR=12, nZ=11, sWE=9, title=Darkness on the Edge of Town, uK=14, uS=5}]}, errors=null) GraphQLResponse(data={Get={CompilationAlbums=[{_additional={certainty=0.6488107144832611, distance=0.7023786}, aUS=97, cAN=—, gER=63, iRE=—, nLD=36, nOR=4, nZ=—, sWE=11, title=Tracks, uK=50, uS=27}]}, errors=null) The interesting parts here are the certainties: Collection Songs has a certainty of 0.75 Collection StudioAlbums has a certainty of 0.62 Collection CompilationAlbums has a certainty of 0.64 The correct answer can be found in the collection of songs that has the highest certainty. So, this is great. When you verify this for the other questions, you will see that the collection containing the correct answer, always has the highest certainty. Conclusion In this post, you transformed the source documents in order to fit in a vector database. The semantic search results are amazing. In the previous posts, it was kind of a struggle to retrieve the correct answers to the questions. By restructuring the data and by only using a vector semantic search a 100% score of correct answers has been achieved.
Have you ever wished for a coding assistant who could help you write code faster, reduce errors, and improve your overall productivity? In this article, I'll share my journey and experiences with GitHub Copilot, a coding companion, and how it has boosted productivity. The article is specifically focused on IntelliJ IDE which we use for building Java Spring-based microservices. Six months ago, I embarked on a journey to explore GitHub Copilot, an AI-powered coding assistant, while working on Java Spring Microservices projects in IntelliJ IDEA. At first, my experience was not so good. I found the suggestions it provided to be inappropriate, and it seemed to hinder rather than help development work. But I decided to persist with the tool, and today, reaping some of the benefits, there is a lot of scope for improvement. Common Patterns Let's dive into some scenarios where GitHub Copilot has played a vital role. Exception Handling Consider the following method: Java private boolean isLoanEligibleForPurchaseBasedOnAllocation(LoanInfo loanInfo, PartnerBank partnerBank){ boolean result = false; try { if (loanInfo != null && loanInfo.getFico() != null) { Integer fico = loanInfo.getFico(); // Removed Further code for brevity } else { logger.error("ConfirmFundingServiceImpl::isLoanEligibleForPurchaseBasedOnAllocation - Loan info is null or FICO is null"); } } catch (Exception ex) { logger.error("ConfirmFundingServiceImpl::isLoanEligibleForPurchaseBasedOnAllocation - An error occurred while checking loan eligibility for purchase based on allocation, detail error:", ex); } return result; } Initially, without GitHub Copilot, we would have to manually add the exception handling code. However, with Copilot, as soon as we added the try block and started adding catch blocks, it automatically suggested the logger message and generated the entire catch block. None of the content in the catch block was typed manually. Additionally, other logger.error in the else part is prefilled automatically by Co-Pilot as soon as we started typing in logger.error. Mocks for Unit Tests In unit testing, we often need to create mock objects. Consider the scenario where we need to create a list of PartnerBankFundingAllocation objects: Java List<PartnerBankFundingAllocation> partnerBankFundingAllocations = new ArrayList<>(); when(this.fundAllocationRepository.getPartnerBankFundingAllocation(partnerBankObra.getBankId(), "Fico")).thenReturn(partnerBankFundingAllocations); If we create a single object and push it to the list: Java PartnerBankFundingAllocation partnerBankFundingAllocation = new PartnerBankFundingAllocation(); partnerBankFundingAllocation.setBankId(9); partnerBankFundingAllocation.setScoreName("Fico"); partnerBankFundingAllocation.setScoreMin(680); partnerBankFundingAllocation.setScoreMax(1000); partnerBankFundingAllocations.add(partnerBankFundingAllocation); GitHub Copilot automatically suggests code for the remaining objects. We just need to keep hitting enter and adjust values if the suggestions are inappropriate. Java PartnerBankFundingAllocation partnerBankFundingAllocation2 = new PartnerBankFundingAllocation(); partnerBankFundingAllocation2.setBankId(9); partnerBankFundingAllocation2.setScoreName("Fico"); partnerBankFundingAllocation2.setScoreMin(660); partnerBankFundingAllocation2.setScoreMax(679); partnerBankFundingAllocations.add(partnerBankFundingAllocation2); Logging/Debug Statements GitHub Copilot also excels in helping with logging and debugging statements. Consider the following code snippet: Java if (percentage < allocationPercentage){ result = true; logger.info("ConfirmFundingServiceImpl::isLoanEligibleForPurchaseBasedOnAllocation - Loan is eligible for purchase"); } else{ logger.info("ConfirmFundingServiceImpl::isLoanEligibleForPurchaseBasedOnAllocation - Loan is not eligible for purchase"); } In this example, all the logger information statements are auto-generated by GitHub Copilot. It takes into account the context of the code condition and suggests relevant log messages. Code Commenting It helps in adding comments at the top of the method. In the code snippet below, the comment above the method is generated by the Copilot. We just need to start typing in // This method. Java // THis method is used to get the loan program based on the product sub type public static String getLoanProgram(List<Product> products, Integer selectedProductId) { String loanProgram = ""; if (products != null && products.size() > 0) { Product product = products.stream().filter(p -> p.getProductId().equals(selectedProductId)).findFirst().orElse(null); if (product != null) { String productSubType = product.getProductSubType(); switch (productSubType) { case "STANDARD": loanProgram = "Standard"; break; case "PROMO": loanProgram = "Promo"; break; default: loanProgram = "NA"; break; } } } return loanProgram; } Alternatively, we can use a prompt like // Q : What is this method doing?. Copilot will add the second line, // A : This method is used to log the payload for the given api name. Java // Q : What is this method doing? // A : This method is used to log the payload for the given api name public static void logPayload(String apiName, Object payload) { try { if (payload != null && apiName != null && apiName.trim().length() > 0) { ObjectMapper mapper = new ObjectMapper(); String payloadResponse = mapper.writeValueAsString(payload); logger.info("UnderwritingUtility::logPayload - For api : " + apiName + ", payload : " + payloadResponse); } else { logger.error("UnderwritingUtility::logPayload - Either object was null of api name was null or empty"); } } catch (Exception ex) { logger.error("UnderwritingUtility::logPayload - An error occurred while logging the payload, detail error : ", ex); } } Another example of a different method we type in a prompt: // Q : What is this method doing?. Copilot will add the second line, // A : This method is used to validate the locale from request, if locale is not valid then set the default locale. Java //Q - Whats below method doing? //A - This method is used to validate the locale from request, if locale is not valid then set the default locale public static boolean isLocaleValid(LoanQuoteRequest loanQuoteRequest){ boolean result = false; try{ if (org.springframework.util.StringUtils.hasText(loanQuoteRequest.getLocale())){ String localeStr = loanQuoteRequest.getLocale(); logger.info("UnderwritingUtility::validateLocale - Locale from request : " + localeStr); Locale locale = new Locale.Builder().setLanguageTag(localeStr).build(); // Get the language part String language = locale.getLanguage(); if (language.equalsIgnoreCase("en")){ result = true; if (!localeStr.equalsIgnoreCase(UwConstants.DEFAULT_LOCALE_CODE)){ loanQuoteRequest.setLocale(UwConstants.DEFAULT_LOCALE_CODE); } } else if (language.equalsIgnoreCase("es")){ result = true; if (!localeStr.equalsIgnoreCase(UwConstants.SPANISH_LOCALE_CODE)){ loanQuoteRequest.setLocale(UwConstants.SPANISH_LOCALE_CODE); } } } else{ result = true; loanQuoteRequest.setLocale(UwConstants.DEFAULT_LOCALE_CODE); } } catch (Exception ex){ logger.error("UnderwritingUtility::validateLocale - An error occurred, detail error : ", ex); } return result; } Closing Thoughts The benefits of using GitHub Copilot in IntelliJ for Java Spring Microservices development are significant. It saves time, reduces errors, and allows us to focus on core business logic instead of writing repetitive code. As we embark on our coding journey with GitHub Copilot, here are a few tips: Be patient and give it some time to learn and identify common coding patterns that we follow. Keep an eye on the suggestions and adjust them as needed. Sometimes, it hallucinates. Experiment with different scenarios to harness the full power of Copilot. Stay updated with Copilot's improvements and updates to make the most of this cutting-edge tool. We can use this in combination with ChatGPT. Here is an article on how it can help boost our development productivity. Happy coding with GitHub Copilot!
The Spring AI is a new project of the Spring ecosystem that streamlines the creation of AI applications in Java. By using Spring AI together with PostgreSQL pgvector, you can build generative AI applications that draw insights from your data. First, this article introduces you to the Spring AI ChatClient that uses the OpenAI GPT-4 model to generate recommendations based on user prompts. Next, the article shows how to deploy PostgreSQL with the PGVector extension and perform vector similarity searches using the Spring AI EmbeddingClient and Spring JdbcClient. Adding Spring AI Dependency Spring AI supports many large language model (LLM) providers, with each LLM having its own Spring AI dependency. Let's assume that you prefer working with OpenAI models and APIs. Then, you need to add the following dependency to a project: XML <dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai-openai-spring-boot-starter</artifactId> <version>{latest.version}</version> </dependency> Also, at the time of writing, Spring AI was in active development, with the framework artifacts being released in the Spring Milestone and/or Snapshot repositories. Thus, if you still can't find Spring AI on https://start.spring.io/, then add the repositories to the pom.xml file: XML <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <releases> <enabled>false</enabled> </releases> </repository> </repositories> Setting Up OpenAI Module The OpenAI module comes with several configuration properties, allowing the management of connectivity-related settings and fine-tuning the behavior of OpenAI models. At a minimum, you need to provide your OpenAI API key, which will be used by Spring AI to access GPT and embedding models. Once the key is created, add it to the application.properties file: Properties files spring.ai.openai.api-key=sk-... Then, if necessary, you can select particular GPT and embedding models: Properties files spring.ai.openai.chat.model=gpt-4 spring.ai.openai.embedding.model=text-embedding-ada-002 In the end, you can test that the OpenAI module is configured properly by implementing a simple assistant with Spring AI's ChatClient: Java // Inject the ChatClient bean @Autowired private ChatClient aiClient; // Create a system message for ChatGPT explaining the task private static final SystemMessage SYSTEM_MESSAGE = new SystemMessage( """ You're an assistant who helps to find lodging in San Francisco. Suggest three options. Send back a JSON object in the format below. [{\"name\": \"<hotel name>\", \"description\": \"<hotel description>\", \"price\": <hotel price>}] Don't add any other text to the response. Don't add the new line or any other symbols to the response. Send back the raw JSON. """); public void searchPlaces(String prompt) { // Create a Spring AI prompt with the system message and the user message Prompt chatPrompt = new Prompt(List.of(SYSTEM_MESSAGE, new UserMessage(prompt))); // Send the prompt to ChatGPT and get the response ChatResponse response = aiClient.generate(chatPrompt); // Get the raw JSON from the response and print it String rawJson = response.getGenerations().get(0).getContent(); System.out.println(rawJson); } For the sake of the experiment, if you pass the "I'd like to stay near the Golden Gate Bridge" prompt, then the searchPlaces the method might provide lodging recommendations as follows: JSON [ {"name": "Cavallo Point", "description": "Historic hotel offering refined rooms, some with views of the Golden Gate Bridge, plus a spa & dining.", "price": 450}, {"name": "Argonaut Hotel", "description": "Upscale, nautical-themed hotel offering Golden Gate Bridge views, plus a seafood restaurant.", "price": 300}, {"name": "Hotel Del Sol", "description": "Colorful, retro hotel with a pool, offering complimentary breakfast & an afternoon cookies reception.", "price": 200} ] Starting Postgres With PGVector If you run the previous code snippet with the ChatClient, you'll notice that it usually takes over 10 seconds for the OpenAI GPT model to generate a response. The model has a broad and deep knowledge base, and it takes time to produce a relevant response. Apart from the high latency, the GPT model might not have been trained on data that is relevant to your application workload. Thus, it might generate responses that are far from being satisfactory for the user. However, you can always expedite the search and provide users with accurate responses if you generate embeddings on a subset of your data and then let Postgres work with those embeddings. The pgvector extension allows storing and querying vector embeddings in Postgres. The easiest way to start with PGVector is by starting a Postgres instance with the extension in Docker: Shell mkdir ~/postgres-volume/ docker run --name postgres \ -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=password \ -p 5432:5432 \ -v ~/postgres-volume/:/var/lib/postgresql/data -d ankane/pgvector:latest Once started, you can connect to the container and enable the extension by executing the CREATE EXTENSION vector statement: Shell docker exec -it postgres psql -U postgres -c 'CREATE EXTENSION vector' Lastly, add the Postgres JDBC driver dependency to the pom.xml file: XML <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>{latest.version}</version> </dependency> Configure the Spring DataSource by adding the following settings to the application.properties file: Properties files spring.datasource.url = jdbc:postgresql://127.0.0.1:5432/postgres spring.datasource.username = postgres spring.datasource.password = password Performing Vector Similarity Search With Spring AI At a minimum, the vector similarity search is a two-step process. First, you need to use an embedding model to generate a vector/embedding for a provided user prompt or other text. Spring AI supports the EmbeddingClient that connects to OpenAI's or other providers' embedding models and generates a vectorized representation for the text input: Java // Inject the Spring AI Embedding client @Autowired private EmbeddingClient aiClient; public List<Place> searchPlaces(String prompt) { // Use the Embedding client to generate a vector for the user prompt List<Double> promptEmbedding = aiClient.embed(prompt); ... } Second, you use the generated embedding to perform a similarity search across vectors stored in the Postgres database. For instance, you can use the Spring JdbcClient for this task: Java @Autowired private JdbcClient jdbcClient; // Inject the Spring AI Embedding client @Autowired private EmbeddingClient aiClient; public List<Place> searchPlaces(String prompt) { // Use the Embedding client to generate a vector for the user prompt List<Double> promptEmbedding = aiClient.embed(prompt); // Perform the vector similarity search StatementSpec query = jdbcClient.sql( "SELECT name, description, price " + "FROM airbnb_listing WHERE 1 - (description_embedding <=> :user_promt::vector) > 0.7 " + "ORDER BY description_embedding <=> :user_promt::vector LIMIT 3") .param("user_promt", promptEmbedding.toString()); // Return the recommended places return query.query(Place.class).list(); } The description_embedding column stores embeddings that were pre-generated for Airbnb listing overviews from the description column. The Airbnb embeddings were produced by the same model that is used by Spring AI's EmbeddingClient for the user prompts. Postgres uses PGVector to calculate the cosine distance (<=>) between the Airbnb and user prompt embeddings (description_embedding <=> :user_prompt::vector) and then returns only those Airbnb listings whose description is > 0.7 similar to the provided user prompt. The similarity is measured as a value in the range from 0 to 1. The closer the similarity to 1, the more related the vectors are. What's Next Spring AI and PostgreSQL PGVector provide all the essential capabilities needed for building generative AI applications in Java. If you're curious to learn more, watch this hands-on tutorial. It guides you through the process of creating a lodging recommendation service in Java from scratch, optimizing similarity searches with specialized indexes, and scaling with distributed Postgres (YugabyteDB):
Distributed computing, also known as distributed processing, involves connecting numerous computer servers through a network to form a cluster. This cluster, known as a "distributed system," enables the sharing of data and coordinating processing power. Distributed computing provides many benefits, such as: Scalability utilizing a "scale-out architecture" Enhanced performance leveraging parallelism Increased resilience by employing redundancy Cost-effectiveness by utilizing low-cost, commodity hardware There are two main advantages of distributed computing: Utilizing the collective processing capabilities of a clustered system Minimizing network hops by conducting computations on the cluster processing the data In this blog post, we explore how Hazelcast simplifies distributed computing (both as a self-managed and managed service). Hazelcast offers three solutions for distributed computing, depending on the use case: Option #1: Entry Processor An entry processor is a functionality that executes your code on a map entry in a manner that ensures atomicity. So you can update, remove, and read map entries on cluster members (servers). This is a good option to perform bulk processing on an IMap. How To Create EntryProcessor Java public class IncrementingEntryProcessor implements EntryProcessor<Integer, Integer, Integer> { public Integer process( Map.Entry<Integer, Integer> entry ) { Integer value = entry.getValue(); entry.setValue( value + 1 ); return value + 1; } @Override public EntryProcessor<Integer, Integer, Integer> getBackupProcessor() { return IncrementingEntryProcessor.this; } } How To Use EntryProcessor Java IMap<Integer, Integer> map = hazelcastInstance.getMap( "myMap" ); for ( int i = 0; i < 100; i++ ) { map.put( i, i ); } Map<Integer, Object> res = map.executeOnEntries( new IncrementingEntryProcessor() ); How To Optimize EntryProcessor Performance Offloadable refers to the capability of transferring execution from the partition thread to an executor thread. ReadOnly indicates the ability to refrain from acquiring a lock on the key. You can learn more about the entry processor in our documentation. Option #2: Java Executor Service Simply put, you can run your Java code on cluster members and obtain the resulting output. Java boasts a standout feature in its Executor framework, enabling the asynchronous execution of tasks, such as database queries, intricate calculations, and image rendering. In the Java Executor framework, you implement tasks in two ways: Callable or Runnable. Similarly, using Hazelcast, you can implement tasks in two ways: Callable or Runnable. How To Implement a Callable Task Java public class SumTask implements Callable<Integer>, Serializable, HazelcastInstanceAware { private transient HazelcastInstance hazelcastInstance; public void setHazelcastInstance( HazelcastInstance hazelcastInstance ) { this.hazelcastInstance = hazelcastInstance; } public Integer call() throws Exception { IMap<String, Integer> map = hazelcastInstance.getMap( "map" ); int result = 0; for ( String key : map.localKeySet() ) { System.out.println( "Calculating for key: " + key ); result += map.get( key ); } System.out.println( "Local Result: " + result ); return result; } } How To Implement a Runnable Task Java public class EchoTask implements Runnable, Serializable { private final String msg; public EchoTask( String msg ) { this.msg = msg; } @Override public void run() { try { Thread.sleep( 5000 ); } catch ( InterruptedException e ) { } System.out.println( "echo:" + msg ); } } How To Scale the Executor Service To scale up, you should improve the processing capacity of the cluster member (JVM). You can do this by increasing the pool-size property mentioned in Configuring Executor Service (i.e., increasing the thread count). However, please be aware of your member’s capacity. If you think it cannot handle an additional load caused by increasing the thread count, you may want to consider improving the member’s resources (CPU, memory, etc.). For example, set the pool size to 5 and run the above MasterMember. You will see that EchoTask is run as soon as it is produced. To scale out, add more members instead of increasing only one member’s capacity. You may want to expand your cluster by adding more physical or virtual machines. For example, in the EchoTask scenario in the Runnable section, you can create another Hazelcast instance that automatically gets involved in the executions started in MasterMember and start processing. You can read more about the Executor Service in our documentation. Option #3: Pipeline Develop a data pipeline capable of swiftly executing batch or streaming processes across cluster participants. It consists of three elements: one or more sources, processing stages, and at least one sink. Depending on the data source, pipelines can be applied for various use cases, such as stream processing on a continuous stream of data (i.e., events) to provide results in real-time as the data is generated, or, in the form of batch processing of a fixed amount of static data for routine tasks, such as generating daily reports. In Hazelcast, pipelines can be defined using either SQL or the Jet API. Below are a few more resources: SQL pipelines Jet API pipelines Pipelines to enrich streams demo When Should You Choose Entry Processor, Executor Service, or Pipeline? Opting for an entry processor is a suitable choice when conducting bulk processing on a map. Typically, this involves iterating through a loop of keys, retrieving the value with map.get(key), modifying the value, and ultimately reintegrating the entry into the map using map.put(key, value). The executor service is well-suited for executing arbitrary Java code on your cluster members. Pipelines are well-suited for scenarios where you need to process multiple entries (i.e., aggregations or joins) or when multiple computing steps need to be performed in parallel. With pipelines, you can update maps based on the results of your computation using an entry processor sink. So here you have three options for distributed computing using Hazelcast. We look forward to your feedback and comments about this blog post! Share your experience on the Hazelcast GitHub repository.
In the ever-evolving landscape of Kubernetes (K8s), the introduction of AI-driven technologies continues to reshape the way we manage and optimize containerized applications. K8sGPT, a cutting-edge platform powered by artificial intelligence, takes center stage in this transformation. This article explores the key features, benefits, and potential applications of K8sGPT in the realm of Kubernetes orchestration. What Is K8sGPT? K8sGPT is an open-source, developer-friendly, innovative, AI-powered tool designed to enhance Kubernetes management and decision-making processes. It leverages advanced natural language processing (NLP) capabilities, offering insights, recommendations, and automation to streamline K8's operations. Key Features and Benefits AI-Driven Insights K8sGPT employs sophisticated NLP algorithms to analyze and interpret Kubernetes configurations, logs, and performance metrics. For example, it can understand user queries such as "k8sgpt analyze --explain" (Analyze the issues in the cluster) and provide actionable insights based on the analysis of the entire Kubernetes Cluster environment. Automated Optimization With the ability to understand the intricacies of Kubernetes environments, K8sGPT provides automated recommendations for resource allocation, scaling, and workload optimizations. For instance, it might suggest scaling down certain pods during periods of low traffic to save resources and costs. Enhanced Troubleshooting The platform excels in pinpointing and diagnosing issues within Kubernetes clusters, accelerating the troubleshooting process and reducing downtime. An example could be its ability to quickly identify and resolve pod bottlenecks or misconfigurations affecting application performance. Intuitive User Interface K8sGPT offers a user-friendly interface that facilitates seamless interaction with the AI models. Users can easily input queries, receive recommendations, and implement changes. The interface may include visualizations of cluster health, workload distribution, and suggested optimizations. Functionality of K8sGPT NLP-Powered Analysis K8sGPT uses NLP algorithms to comprehend natural language queries related to Kubernetes configurations, issues, and optimizations. K8sGPT can offer solutions to problems faced by developers, thereby allowing them to resolve issues more quickly. Users can use prompts like "What is the current state of my cluster?" and receive detailed, human-readable responses. Through its interactive functionality, K8sGPT can provide insights into the problems in a Kubernetes cluster and suggest potential solutions. Data Integration and Filters The platform integrates with Kubernetes clusters, accessing real-time data on configurations, performance, and logs. It seamlessly fetches data from various sources, ensuring a comprehensive view of the Kubernetes ecosystem. K8sGPT also offers integration with other tools. This integration provides the flexibility to use Kubernetes resources as filters. K8sGPT can generate a vulnerability report for the cluster and suggest solutions to address any security issues identified. This information can assist security teams in promptly remedying the vulnerabilities and maintaining a secure cluster. AI-Generated Insights K8sGPT processes the integrated data to generate insights, recommendations, and actionable steps for optimizing Kubernetes environments. For example, it might recommend redistributing workloads based on historical usage patterns for more efficient resource utilization. Applications of K8sGPT Continuous Optimization: K8sGPT ensures ongoing optimization by continuously monitoring Kubernetes clusters and adapting to changes in workload and demand. It can dynamically adjust resource allocations based on real-time traffic patterns and user-defined policies. Predictive Maintenance: K8sGPT can predict potential issues in a Kubernetes cluster based on historical performance data, helping to prevent downtime or reduce the impact of failures. Efficient Resource Management: The platform aids in the efficient allocation of resources, preventing under-utilization or over-provisioning of resources within Kubernetes clusters. For instance, it might suggest scaling up certain services during peak hours and scaling down during periods of inactivity. Fault Detection and Diagnosis: K8sGPT proactively identifies and addresses potential issues before they impact application performance, enhancing overall reliability. An example could be detecting abnormal pod behavior and triggering automated remediation steps to ensure continuous service availability. Capacity Planning: K8sGPT can help teams forecast future demand for Kubernetes resources and plan for capacity needs accordingly. Security and Compliance: K8sGPT can monitor Kubernetes clusters for potential security risks and provide recommendations to improve compliance with relevant regulations and standards. Real-World Use Cases E-commerce Scalability: In an e-commerce environment, K8sGPT can dynamically scale resources during flash sales to handle increased traffic and then scale down during normal periods, optimizing costs and ensuring a seamless customer experience. Healthcare Workload Management: In a healthcare application, K8sGPT can analyze patient data processing workloads, ensuring resources are allocated efficiently to handle critical real-time data while optimizing resource usage during non-peak hours. Finance Application Security: For a financial application, K8sGPT can continuously monitor and analyze security configurations, automatically recommending and implementing adjustments to enhance the overall security posture of the Kubernetes environment. Conclusion Kubernetes continues to be the cornerstone of container orchestration. K8sGPT emerges as a game-changer, introducing AI-driven capabilities to simplify management, enhance optimization, and provide valuable insights. Embracing K8sGPT positions organizations at the forefront of efficient, intelligent, and future-ready Kubernetes operations.
In a previous blog, we set up a Debezium server reading events from a PostgreSQL database. Then we streamed those changes to a Redis instance through a Redis stream. We might get the impression that to run Debezium we need to have two extra components running in our infrastructure: A standalone Debezium server instance. A software component with streaming capabilities and various integrations, such as Redis or Kafka. This is not always the case since Debezium can run in embedded mode. By running in embedded mode you use Debezium to read directly from a database’s transaction log. It is up to you how you are gonna handle the entries retrieved. The process of reading the entries from the transaction log can reside on any Java application thus there is no need for a standalone deployment. Apart from the number of components reduced, the other benefit is that we can alter the entries as we read them from the database and take action in our application. Sometimes we might just need a subset of the capabilities offered. Let’s use the same PostgreSQL configurations we used previously. Properties files listen_addresses = '*' port = 5432 max_connections = 20 shared_buffers = 128MB temp_buffers = 8MB work_mem = 4MB wal_level = logical max_wal_senders = 3 Also, we shall create an initialization script for the table we want to focus on. PLSQL #!/bin/bash set -e psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL create schema test_schema; create table test_schema.employee( id SERIAL PRIMARY KEY, firstname TEXT NOT NULL, lastname TEXT NOT NULL, email TEXT not null, age INT NOT NULL, salary real, unique(email) ); EOSQL Our Docker Compose file will look like this. Dockerfile version: '3.1' services: postgres: image: postgres restart: always environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres volumes: - ./postgresql.conf:/etc/postgresql/postgresql.conf - ./init:/docker-entrypoint-initdb.d command: - "-c" - "config_file=/etc/postgresql/postgresql.conf" ports: - 5432:5432 The configuration files we created are mounted onto the PostgreSQL Docker container. Docker Compose V2 is out there with many good features. Provided we run docker compose up, a Postgresql server with a schema and a table will be up and running. Also, that server will have logical decoding enabled and Debezium shall be able to track changes on that table through the transaction log. We have everything needed to proceed with building our application. First, let’s add the dependencies needed: XML <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <version.debezium>2.3.1.Final</version.debezium> <logback-core.version>1.4.12</logback-core.version> </properties> <dependencies> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-postgres</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-storage-jdbc</artifactId> <version>${version.debezium}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback-core.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback-core.version}</version> </dependency> </dependencies> We also need to create the Debezium embedded properties: Properties files name=embedded-debezium-connector connector.class=io.debezium.connector.postgresql.PostgresConnector offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore offset.flush.interval.ms=60000 database.hostname=127.0.0.1 database.port=5432 database.user=postgres database.password=postgres database.dbname=postgres database.server.name==embedded-debezium debezium.source.plugin.name=pgoutput plugin.name=pgoutput database.server.id=1234 topic.prefix=embedded-debezium schema.include.list=test_schema table.include.list=test_schema.employee Apart from establishing the connection towards the PostgreSQL Database we also decided to store the offset in a file. By using the offset in Debezium we keep track of the progress we make in processing the events. On each change that happens on the table, test_schema.employee we shall receive an event. Once we receive that event our codebase should handle it. To handle the events we need to create a DebeziumEngine.ChangeConsumer. The ChangeConsumer will consume the events emitted. Java package com.egkatzioura; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.RecordChangeEvent; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; public class CustomChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> { @Override public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException { for(RecordChangeEvent<SourceRecord> record: records) { System.out.println(record.record().toString()); } } } Every incoming event will be printed on the console. Now we can add our main class where we set up the engine. Java package com.egkatzioura; import io.debezium.embedded.Connect; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.ChangeEventFormat; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class Application { public static void main(String[] args) throws IOException { Properties properties = new Properties(); try(final InputStream stream = Application.class.getClassLoader().getResourceAsStream("embedded_debezium.properties")) { properties.load(stream); } properties.put("offset.storage.file.filename",new File("offset.dat").getAbsolutePath()); var engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) .using(properties) .notifying(new CustomChangeConsumer()) .build(); engine.run(); } } Provided our application is running as well as the PostgreSQL database we configured previously, we can start inserting data. SQL docker exec -it debezium-embedded-postgres-1 psql postgres postgres psql (15.3 (Debian 15.3-1.pgdg120+1)) Type "help" for help. postgres=# insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23); Also, we can see the change in the console. Shell SourceRecord{sourcePartition={server=embedded-debezium}, sourceOffset={last_snapshot_record=true, lsn=22518160, txId=743, ts_usec=1705916606794160, snapshot=true} ConnectRecord{topic='embedded-debezium.test_schema.employee', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{embedded-debezium.test_schema.employee.Key:STRUCT}, value=Struct{after=Struct{id=1,firstname=John,lastname=Doe 1,email=john1@doe.com,age=18,salary=1234.23},source=Struct{version=2.3.1.Final,connector=postgresql,name=embedded-debezium,ts_ms=1705916606794,snapshot=last,db=postgres,sequence=[null,"22518160"],schema=test_schema,table=employee,txId=743,lsn=22518160},op=r,ts_ms=1705916606890}, valueSchema=Schema{embedded-debezium.test_schema.employee.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} We did it. We managed to run Debezium through a Java application without the need for a standalone Debezium server running or a streaming component. You can find the code on GitHub.
The software development landscape is rapidly evolving. New tools, technologies, and trends are always bubbling to the top of our workflows and conversations. One of those paradigm shifts that has become more pronounced in recent years is the adoption of microservices architecture by countless organizations. Managing microservices communication has been a sticky challenge for many developers. As a microservices developer, I want to focus my efforts on the core business problems and functionality that my microservices need to achieve. I’d prefer to offload the inter-service communication concerns—just like I do with authentication or API security. So, that brings me to the KubeMQ Control Center (KCC). It’s a service for managing microservices communication that’s quick to set up and designed with an easy-to-use UI. In this article, I wanted to unpack some of the functionality I explored as I tested it in a real-world scenario. Setting the Scene Microservices communication presents a complex challenge, akin to orchestrating a symphony with numerous distinct instruments. It demands precision and a deep understanding of the underlying architecture. Fortunately, KCC—with its no-code setup and Kubernetes-native integration—aims to abstract away this complexity. Let's explore how it simplifies microservices messaging. Initial Setup and Deployment Deploy KubeMQ Using Docker The journey with KCC starts with a Docker-based deployment. This process is straightforward: Shell $ docker run -d \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN=(add token here) kubemq/kubemq This command sets up KubeMQ, aligning the necessary ports and establishing secure access. Send a "Hello World" Message After deployment, you can access the KubeMQ dashboard in your browser at http://localhost:8080/. Here, you have a clean, intuitive UI to help you manage your microservices. We can send a “Hello World” message to test the waters. In the Dashboard, click Send Message and select Queues. We set a channel name (q1) and enter "hello world!" in the body. Then, we click Send. Just like that, we successfully created our first message! And it’s only been one minute since we deployed KubeMQ and started using KCC. Pulling a Message Retrieving messages is a critical aspect of any messaging platform. From the Dashboard, select your channel to open the Queues page. Under the Pull tab, click Pull to retrieve the message that you just sent. The process is pretty smooth and efficient. We can review the message details for insights into its delivery and content. Send “Hello World” With Code Moving beyond the UI, we can send a “Hello world” message programmatically too. For example, here’s how you would send a message using C#. KCC integrates with most of the popular programming languages, which is essential for diverse development environments. Here are the supported languages and links to code samples and SDKs: C# and .NET Java Go Node.js Python Deploying KubeMQ in Kubernetes Transitioning to Kubernetes with KCC is pretty seamless, too. KubeMQ is shooting to design with scalability and the developer in mind. Here’s a quick guide to getting started. Download KCC Download KCC from KubeMQ’s account area. They offer a 30-day free trial so you can do a comprehensive evaluation. Unpack the Zip File Shell $ unzip kcc_mac_apple.zip -d /kubemq/kcc Launch the Application Shell $ ./kcc The above step integrates you into the KubeMQ ecosystem, which is optimized for Kubernetes. Add a KubeMQ Cluster Adding a KubeMQ cluster is crucial for scaling and managing your microservices architecture effectively. Monitor Cluster Status The dashboard provides an overview of your KubeMQ components, essential for real-time system monitoring. Explore Bridges, Targets, and Sources KCC has advanced features like Bridges, Targets, and Sources, which serve as different types of connectors between KubeMQ clusters, external messaging systems, and external cloud services. These tools will come in handy when you have complex data flows and system integrations, as many microservices architectures do. Conclusion That wraps up our journey through KubeMQ's Control Center. Dealing with the complexities of microservice communication can be a burden, taking the developer away from core business development. Developers can offload that burden to KCC. With its intuitive UI and suite of features, KCC helps developers be more efficient as they build their applications on microservice architectures. Of course, we’ve only scratched the surface here. Unlocking the true potential of any tool requires deeper exploration and continued use. For that, you can check out KubeMQ’s docs site. Or you can build on what we’ve shown above, continuing to play around on your own. With the right tools in your toolbox, you’ll quickly be up and running with a fleet of smoothly communicating microservices! Have a really great day!
Mark Gardner
Independent Contractor,
The Perl Shop
Nuwan Dias
VP and Deputy CTO,
WSO2
Radivoje Ostojic
Principal Software Engineer,
BrightMarbles
Adam Houghton
Senior Software Developer,
SAS Institute