HEAVY.AI Docs
v8.1.0
v8.1.0
  • Welcome to HEAVY.AI Documentation
  • Overview
    • Overview
    • Release Notes
  • Installation and Configuration
    • System Requirements
      • Hardware Reference
      • Software Requirements
      • Licensing
    • Installation
      • Free Version
      • Installing on Docker
        • HEAVY.AI Installation using Docker on Ubuntu
      • Installing on Ubuntu
        • HEAVY.AI Installation on Ubuntu
        • Install NVIDIA Drivers and Vulkan on Ubuntu
      • Installing on Rocky Linux / RHEL
        • HEAVY.AI Installation on RHEL
        • Install NVIDIA Drivers and Vulkan on Rocky Linux and RHEL
      • Getting Started on AWS
      • Getting Started on GCP
      • Getting Started on Azure
      • Getting Started on Kubernetes (BETA)
      • Upgrading
        • Upgrading HEAVY.AI
        • Upgrading from Omnisci to HEAVY.AI 6.0
        • CUDA Compatibility Drivers
      • Uninstalling
      • Ports
    • Services and Utilities
      • Using Services
      • Using Utilities
    • Executor Resource Manager
    • Configuration Parameters
      • Overview
      • Configuration Parameters for HeavyDB
      • Configuration Parameters for HEAVY.AI Web Server
      • Configuration Parameters for HeavyIQ
    • Security
      • Roles and Privileges
        • Column-Level Security
      • Connecting Using SAML
      • Implementing a Secure Binary Interface
      • Encrypted Credentials in Custom Applications
      • LDAP Integration
    • Distributed Configuration
  • Loading and Exporting Data
    • Supported Data Sources
      • Kafka
      • Using HeavyImmerse Data Manager
      • Importing Geospatial Data
    • Command Line
      • Loading Data with SQL
      • Exporting Data
  • SQL
    • Data Definition (DDL)
      • Datatypes
      • Users and Databases
      • Tables
      • System Tables
      • Views
      • Policies
      • Comment
    • Data Manipulation (DML)
      • SQL Capabilities
        • ALTER SESSION SET
        • ALTER SYSTEM CLEAR
        • DELETE
        • EXPLAIN
        • INSERT
        • KILL QUERY
        • LIKELY/UNLIKELY
        • SELECT
        • SHOW
        • UPDATE
        • Arrays
        • Logical Operators and Conditional and Subquery Expressions
        • Table Expression and Join Support
        • Type Casts
      • Geospatial Capabilities
        • Uber H3 Hexagonal Modeling
      • Functions and Operators
      • System Table Functions
        • generate_random_strings
        • generate_series
        • tf_compute_dwell_times
        • tf_feature_self_similarity
        • tf_feature_similarity
        • tf_geo_rasterize
        • tf_geo_rasterize_slope
        • tf_graph_shortest_path
        • tf_graph_shortest_paths_distances
        • tf_load_point_cloud
        • tf_mandelbrot*
        • tf_point_cloud_metadata
        • tf_raster_contour_lines; tf_raster_contour_polygons
        • tf_raster_graph_shortest_slope_weighted_path
        • tf_rf_prop_max_signal (Directional Antennas)
        • ts_rf_prop_max_signal (Isotropic Antennas)
        • tf_rf_prop
      • Window Functions
      • Reserved Words
      • SQL Extensions
      • HeavyIQ LLM_TRANSFORM
  • HeavyImmerse
    • Introduction to HeavyImmerse
    • Admin Portal
    • Control Panel
    • Working with Dashboards
      • Dashboard List
      • Creating a Dashboard
      • Configuring a Dashboard
      • Duplicating and Sharing Dashboards
    • Measures and Dimensions
    • Using Parameters
    • Using Filters
    • Using Cross-link
    • Chart Animation
    • Multilayer Charts
    • SQL Editor
    • Customization
    • Joins (Beta)
    • Chart Types
      • Overview
      • Bubble
      • Choropleth
      • Combo
      • Contour
      • Cross-Section
      • Gauge
      • Geo Heatmap
      • Heatmap
      • Linemap
      • Number
      • Pie
      • Pointmap
      • Scatter Plot
      • Skew-T
      • Table
      • Text Widget
      • Wind Barb
    • Deprecated Charts
      • Bar
      • Combo - Original
      • Histogram
      • Line
      • Stacked Bar
    • HeavyIQ SQL Notebook
  • HEAVYIQ Conversational Analytics
    • HeavyIQ Overview
      • HeavyIQ Guidance
  • HeavyRF
    • Introduction to HeavyRF
    • Getting Started
    • HeavyRF Table Functions
  • HeavyConnect
    • HeavyConnect Release Overview
    • Getting Started
    • Best Practices
    • Examples
    • Command Reference
    • Parquet Data Wrapper Reference
    • ODBC Data Wrapper Reference
    • Raster Data Wrapper Reference
  • HeavyML (BETA)
    • HeavyML Overview
    • Clustering Algorithms
    • Regression Algorithms
      • Linear Regression
      • Random Forest Regression
      • Decision Tree Regression
      • Gradient Boosting Tree Regression
    • Principal Components Analysis
  • Python / Data Science
    • Data Science Foundation
    • JupyterLab Installation and Configuration
    • Using HEAVY.AI with JupyterLab
    • Python User-Defined Functions (UDFs) with the Remote Backend Compiler (RBC)
      • Installation
      • Registering and Using a Function
      • User-Defined Table Functions
      • RBC UDF/UDTF Example Notebooks
      • General UDF/UDTF Tutorial Notebooks
      • RBC API Reference
    • Ibis
    • Interactive Data Exploration with Altair
    • Additional Examples
      • Forecasting with HEAVY.AI and Prophet
  • APIs and Interfaces
    • Overview
    • heavysql
    • Thrift
    • JDBC
    • ODBC
    • Vega
      • Vega Tutorials
        • Vega at a Glance
        • Getting Started with Vega
        • Getting More from Your Data
        • Creating More Advanced Charts
        • Using Polys Marks Type
        • Vega Accumulator
        • Using Transform Aggregation
        • Improving Rendering with SQL Extensions
      • Vega Reference Overview
        • data Property
        • projections Property
        • scales Property
        • marks Property
      • Migration
        • Migrating Vega Code to Dynamic Poly Rendering
      • Try Vega
    • RJDBC
    • SQuirreL SQL
    • heavyai-connector
  • Tutorials and Demos
    • Loading Data
    • Using Heavy Immerse
    • Hello World
    • Creating a Kafka Streaming Application
    • Getting Started with Open Source
    • Try Vega
  • Troubleshooting and Special Topics
    • FAQs
    • Troubleshooting
    • Vulkan Renderer
    • Optimizing
    • Known Issues and Limitations
    • Logs and Monitoring
    • Archived Release Notes
      • Release 6.x
      • Release 5.x
      • Release 4.x
      • Release 3.x
Powered by GitBook
On this page
  • Creating a Kafka Producer
  • Creating a Kafka Consumer
  • Running the Kafka Click-through Application
Export as PDF
  1. Tutorials and Demos

Creating a Kafka Streaming Application

Apache Kafka is a distributed streaming platform. You can use Kafka to stream data directly from an application into HeavyDB.

PreviousHello WorldNextGetting Started with Open Source

This example is a bare-bones click-through application that captures user activity.

This example assumes you have already installed and configured Apache Kafka. See the . The FlavorPicker example also has dependencies on Swing/AWT classes. See the .

Creating a Kafka Producer

FlavorPicker.java sends the choice of Chocolate, Strawberry, or Vanilla to the Kafka broker. This example uses only one column of information, but the mechanism is the same for records of any size.

package flavors;

// Swing/AWT Interface classes
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.EventQueue;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JLabel;

// Generic Java properties object
import java.util.Properties;

// Kafka Producer-specific classes
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class FlavorPicker{

    private JFrame frmFlavors;
    private Producer producer;
    /**
     * Launch the application.
     */
    public static void main(String[] args) {
       EventQueue.invokeLater(new Runnable() {
          public void run() {
             try {
                FlavorPicker window = new FlavorPicker(args);
                window.frmFlavors.setVisible(true);
             } catch (Exception e) {
                e.printStackTrace();
             }
          }
       });
    }

    /**
     * Create the application.
     */
    public FlavorPicker(String[] args) {
       initialize(args);
    }

    /**
     * Initialize the contents of the frame.
     */
    private void initialize(String[] args) {
       frmFlavors = new JFrame();
       frmFlavors.setTitle("Flavors");
       frmFlavors.setBounds(100, 100, 408, 177);
       frmFlavors.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
       frmFlavors.getContentPane().setLayout(null);

       final JLabel lbl_yourPick = new JLabel("You picked nothing.");
       lbl_yourPick.setBounds(130, 85, 171, 15);
       frmFlavors.getContentPane().add(lbl_yourPick);

       JButton button = new JButton("Strawberry");
       button.addActionListener(new ActionListener() {
          public void actionPerformed(ActionEvent arg0) {
             lbl_yourPick.setText("You picked strawberry.");
             pick(args,1);
          }
       });
       button.setBounds(141, 12, 114, 25);
       frmFlavors.getContentPane().add(button);

       JButton btnVanilla = new JButton("Vanilla");
       btnVanilla.addActionListener(new ActionListener() {
          public void actionPerformed(ActionEvent e) {
             lbl_yourPick.setText("You picked vanilla.");
             pick(args,2);
          }
       });
       btnVanilla.setBounds(278, 12, 82, 25);
       frmFlavors.getContentPane().add(btnVanilla);


       JButton btnChocolate = new JButton("Chocolate");
       btnChocolate.addActionListener(new ActionListener() {
          public void actionPerformed(ActionEvent e) {
             lbl_yourPick.setText("You picked chocolate.");
             pick(args, 0);
          }
       });

       btnChocolate.setBounds(12, 12, 105, 25);
       frmFlavors.getContentPane().add(btnChocolate);
    }
    public void pick(String[] args,int x) {
         String topicName = args[0].toString();
         String[] value = {"chocolate","strawberry","vanilla"};

         // Set the producer configuration properties.
         Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9097");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 100);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

     // Instantiate a producerSampleJDBC
     producer = new KafkaProducer(props);

     // Send a 1000 record stream to the Kafka Broker
     for (int y=0; y<1000; y++){
         producer.send(new ProducerRecord(topicName, value[x]));
     }
   }
}//End FlavorPicker.java

Creating a Kafka Consumer

FlavorConsumer.java polls the Kafka broker periodically, pulls any new topics added since the last poll, and loads them to HeavyDB. Ideally, each batch should be fairly substantial in size, minimally 1,000 rows or more, so as not to overburden the server.

package flavors;

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

//JDBC
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

// Usage:\nFlavorConsumer  

public class FlavorConsumer {
    public static void main(String[] args) throws Exception {
       if (args.length < 2) {
          System.out.println("Usage:\n\nFlavorConsumer  ");
          return;
       }
       // Configure the Kafka Consumer
       String topicName = args[0].toString();
       Properties props = new Properties();

       props.put("bootstrap.servers", "localhost:9097"); // Use 9097 so as not
                                              // to collide with
                                              // Heavy Immerse
       props.put("group.id", "test");
       props.put("enable.auto.commit", "true");
       props.put("auto.commit.interval.ms", "1000");
       props.put("session.timeout.ms", "30000");
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       KafkaConsumer consumer = new KafkaConsumer(props);

       // Subscribe the Kafka Consumer to the topic.
       consumer.subscribe(Arrays.asList(topicName));

       // print the topic name
       System.out.println("Subscribed to topic " + topicName);

       String flavorValue = "";

       while (true) {
          ConsumerRecords records = consumer.poll(1000);

          // Create connection and prepared statement objects
          Connection conn = null;
          PreparedStatement pstmt = null;

          try {
             // JDBC driver name and database URL
             final String JDBC_DRIVER = "com.mapd.jdbc.MapDDriver";
             final String DB_URL = "jdbc:heavyai:localhost:6274:heavyai";

             // Database credentials
             final String USER = "heavyai";
             final String PASS = args[1].toString();

             // STEP 1: Register JDBC driver
             Class.forName(JDBC_DRIVER);

             // STEP 2: Open a connection
             conn = DriverManager.getConnection(DB_URL, USER, PASS);

             // STEP 3: Prepare a statement template
             pstmt = conn.prepareStatement("INSERT INTO flavors VALUES (?)");

             // STEP 4: Populate the prepared statement batch
             for (ConsumerRecord record : records) {
                flavorValue = record.value();
                pstmt.setString(1, flavorValue);
                pstmt.addBatch();
             }

             // STEP 5: Execute the batch statement (send records to HeavyDB)
             pstmt.executeBatch();

             // Commit and close the connection.
             conn.commit();
             conn.close();

          } catch (SQLException se) {
             // Handle errors for JDBC
             se.printStackTrace();

          } catch (Exception e) {
             // Handle errors for Class.forName
             e.printStackTrace();
          } finally {

             try {
                if (pstmt != null) {
                   pstmt.close();
                }
             } catch (SQLException se2) {
             } // nothing we can do

             try {
                if (conn != null) {
                   conn.close();
                }
             } catch (SQLException se) {
                se.printStackTrace();
             } // end finally try

          } // end try
       } // end main
    }
}// end FlavorConsumer}

Running the Kafka Click-through Application

To run the application, you need to perform the following tasks:

  • Compile FlavorConsumer.java and FlavorPicker.java.

  • Create a table in HeavyDB.

  • Start the Zookeeper server.

  • Start the Kafka server.

  • Start the Kafka consumer.

  • Start the Kafka producer.

  • View the results using heavysql and Heavy Immerse.

  1. Compile FlavorConsumer.java and FlavorPicker.java, storing the resulting class files in $HEAVYAI_PATH/SampleCode/kafka-clickthrough/bin.

  2. heavysql> CREATE TABLE flavors (flavor TEXT ENCODING DICT);
  3. Open a new terminal window.

  4. Go to your kafka directory.

  5. Start the Zookeeper server with the following command.

    ./bin/zookeeper-server-start.sh config/zookeeper.properties
  6. Open a new terminal window.

  7. Go to the kafka directory.

  8. Start the Kafka server with the following command.

    ./bin/kafka-server-start.sh config/server.properties
  9. Open a new terminal window.

  10. Go to the kafka directory.

  11. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
    --partitions 1 --topic myflavors
  12. Open a new terminal window.

  13. Launch FlavorConsumer with the following command, substituting the actual path to the Kafka directory and your HeavyDB password.

    java  -cp .:<kafka-directory-path>/libs/*:$HEAVYAI_PATH/bin/*:$HEAVYAI_PATH/SampleCode/kafka-clickthrough/bin flavors.FlavorConsumer myflavors <myPassword>
  14. Launch FlavorPicker with the following command.

    java  -cp .:<kafka-directory-path>/libs/*:$HEAVYAI_PATH/bin/*:$HEAVYAI_PATH/SampleCode/kafka-clickthrough/bin flavors.FlavorPicker myflavors
  15. Click to create several records for Chocolate, Strawberry, and Vanilla. Each click generates 1,000 records.

17. Use Immerse to visualize the results.

Using heavysql, create the table flavors with one column, flavor, in HeavyDB. See for more information.

Create a new Kafka topic with the following command. This starts a basic broker with only one replica and one partition. See the for more information.

16. Use to see that the results have arrived in HeavyDB.

heavysql
Kafka documentation
heavysql
Kafka website
Oracle Java SE website