Loading Data with SQL

This topic describes several ways to load data to HEAVY.AI using SQL commands.

  • If there is a potential for duplicate entries, and you want to avoid loading duplicate rows, see How can I avoid creating duplicate rows? on the Troubleshooting page.

  • If a source file uses a reserved word, HEAVY.AI automatically adds an underscore at the end of the reserved word. For example, year is converted to year_.

COPY FROM

CSV/TSV Import

Use the following syntax for CSV and TSV files:

COPY <table> FROM '<file pattern>' [WITH (<property> = value, ...)];

<file pattern> must be local on the server. The file pattern can contain wildcards if you want to load multiple files. In addition to CSV, TSV, and TXT files, you can import compressed files in TAR, ZIP, 7-ZIP, RAR, GZIP, BZIP2, or TGZ format.

COPY FROM appends data from the source into the target table. It does not truncate the table or overwrite existing data.

You can import client-side files (\copy command in heavysql) but it is significantly slower. For large files, HEAVY.AI recommends that you first scp the file to the server, and then issue the COPY command.

HEAVYAI supports Latin-1 ASCII format and UTF-8. If you want to load data with another encoding (for example, UTF-16), convert the data to UTF-8 before loading it to HEAVY.AI.

Available properties in the optional WITH clause are described in the following table.

By default, the CSV parser assumes one row per line. To import a file with multiple lines in a single field, specify threads = 1 in the WITH clause.

Examples

COPY tweets FROM '/tmp/tweets.csv' WITH (nulls = 'NA'); 
COPY tweets FROM '/tmp/tweets.tsv' WITH (delimiter = '\t', quoted = 'false'); 
COPY tweets FROM '/tmp/*' WITH (header='false'); 
COPY trips FROM '/mnt/trip/trip.parquet/part-00000-0284f745-1595-4743-b5c4-3aa0262e4de3-c000.snappy.parquet' with (parquet='true');

Geo Import

You can use COPY FROM to import geo files. You can create the table based on the source file and then load the data:

COPY FROM 'source' WITH (source_type='geo_file', ...);

You can also append data to an existing, predefined table:

COPY tableName FROM 'source' WITH (source_type='geo_file', ...);

Use the following syntax, depending on the file source.

  • If you are using COPY FROM to load to an existing table, the field type must match the metadata of the source file. If it does not, COPY FROM throws an error and does not load the data.

  • COPY FROM appends data from the source into the target table. It does not truncate the table or overwrite existing data.

  • Supported DATE formats when using COPY FROM include mm/dd/yyyy, dd-mmm-yy, yyyy-mm-dd, and dd/mmm/yyyy.

  • COPY FROM fails for records with latitude or longitude values that have more than 4 decimal places.

The following WITH options are available for geo file imports from all sources.

Currently, a manually created geo table can have only one geo column. If it has more than one, import is not performed.

Any GDAL-supported file type can be imported. If it is not supported, GDAL throws an error.

An ESRI file geodatabase can have multiple layers, and importing it results in the creation of one table for each layer in the file. This behavior differs from that of importing shapefiles, GeoJSON, or KML files, which results in a single table. For more information, see Importing an ESRI File Geodatabase.

The first compatible file in the bundle is loaded; subfolders are traversed until a compatible file is found. The rest of the contents in the bundle are ignored. If the bundle contains multiple filesets, unpack the file manually and specify it for import.

For more information about importing specific geo file formats, see Importing Geospatial Files.

CSV files containing WKT strings are not considered geo files and should not be parsed with the source_type='geo' option. When importing WKT strings from CSV files, you must create the table first. The geo column type and encoding are specified as part of the DDL. For example, for a polygon with no encoding, try the following:

ggpoly GEOMETRY(POLYGON, 4326) ENCODING COMPRESSED(32)

Raster Import

You can use COPY FROM to import raster files supported by GDAL as one row per pixel, where a pixel may consist of one or more data bands, with optional corresponding pixel or world-space coordinate columns. This allows the data to be rendered as a point/symbol cloud that approximates a 2D image.

COPY FROM 'source' WITH (source_type='raster_file', ...);

Use the same syntax that you would for geo files, depending on the file source.

The following WITH options are available for raster file imports from all sources.

Illegal combinations of raster_point_type and raster_point_transform are rejected. For example, world transform can only be performed on raster files that have a geospatial coordinate system in their metadata, and cannot be performed if <type> is an integer format (which cannot represent world-space coordinate values).

Any GDAL-supported file type can be imported. If it is not supported, GDAL throws an error.

HDF5 and possibly other GDAL drivers may not be thread-safe, so use WITH (threads=1) when importing.

Archive file import (.zip, .tar, .tar.gz) is not currently supported for raster files.

Band and Column Names

The following raster file formats contain the metadata required to derive sensible names for the bands, which are then used for their corresponding columns:

  • GRIB2 - geospatial/meteorological format

  • OME TIFF - an OpenMicroscopy format

The band names from the file are sanitized (illegal characters and spaces removed) and de-duplicated (addition of a suffix in cases where the same band name is repeated within the file or across datasets).

For other formats, the columns are named band_1_1, band_1_2 , and so on.

The sanitized and de-duplicated names must be used for the raster_import_bands option.

Band and Column Data Types

Raster files can have bands in the following data types:

  • Signed or unsigned 8-, 16-, or 32-bit integer

  • 32- or 64-bit floating point

  • Complex number formats (not supported)

Signed data is stored in the directly corresponding column type, as follows:

int8 -> TINYINT int16 -> SMALLINT int32 -> INT float32 -> FLOAT float64 -> DOUBLE

Unsigned integer column types are not currently supported, so any data of those types is converted to the next size up signed column type:

uint8 -> SMALLINT uint16 -> INT uint32 -> BIGINT

Column types cannot currently be overridden.

ODBC Import

ODBC import is currently a beta feature.

You can use COPY FROM to import data from a Relational Database Management System (RDMS) or data warehouse using the Open Database Connectivity (ODBC) interface.

COPY <table_name> FROM '<select_query>' WITH (source_type = 'odbc', ...);

The following WITH options are available for ODBC import.

Examples

Using a data source name:

COPY example_table
  FROM 'SELECT * FROM remote_postgres_table WHERE event_timestamp > ''2020-01-01'';'
  WITH 
    (source_type = 'odbc', 
     sql_order_by = 'event_timestamp',
     data_source_name = 'postgres_db_1',
     username = 'my_username',
     password = 'my_password');

Using a connection string:

COPY example_table
  FROM 'SELECT * FROM remote_postgres_table WHERE event_timestamp > ''2020-01-01'';'
  WITH 
    (source_type = 'odbc',
     sql_order_by = 'event_timestamp',
     connection_string = 'Driver=PostgreSQL;Database=my_postgres_db;Servername=my_postgres.example.com;Port=1234',
     credential_string = 'Username=my_username;Password=my_password');

For information about using ODBC HeavyConnect, see ODBC Data Wrapper Reference.

Globbing, Filtering, and Sorting Parquet and CSV Files

These examples assume the following folder and file structure:

Globbing

Local Parquet/CSV files can now be globbed by specifying either a path name with a wildcard or a folder name.

Globbing a folder recursively returns all files under the specified folder. For example,

COPY table_1 FROM ".../subdir";

returns file_3, file_4, file_5.

Globbing with a wildcard returns any file paths matching the expanded file path. So

COPY table_1 FROM ".../subdir/file*"; returns file_3, file_4.

Does not apply to S3 cases, because file paths specified for S3 always use prefix matching.

Filtering

Use file filtering to filter out unwanted files that have been globbed. To use filtering, specify the REGEX_PATH_FILTER option. Files not matching this pattern are not included on import. Consistent across local and S3 use cases.

The following regex expression:

COPY table_1 from ".../" WITH (REGEX_PATH_FILTER=".*file_[4-5]");

returns file_4, file_5.

Sorting

Use the FILE_SORT_ORDER_BY option to specify the order in which files are imported.

FILE_SORT_ORDER_BY Options

  • pathname (default)

  • date_modified

  • regex *

  • regex_date *

  • regex_number *

*FILE_SORT_REGEX option required

Using FILE_SORT_ORDER_BY

COPY table_1 from ".../" WITH (FILE_SORT_ORDER_BY="date_modified");

Using FILE_SORT_ORDER_BY with FILE_SORT_REGEX

Regex sort keys are formed by the concatenation of all capture groups from the FILE_SORT_REGEX expression. Regex sort keys are strings but can be converted to dates or FLOAT64 with the appropriate FILE_SORT_ORDER_BY option. File paths that do not match the provided capture groups or that cannot be converted to the appropriate date or FLOAT64 are treated as NULLs and sorted to the front in a deterministic order.

Multiple Capture Groups:

FILE_SORT_REGEX=".*/data_(.*)_(.*)_" /root/dir/unmatchedFile<NULL> /root/dir/data_andrew_54321_andrew54321 /root/dir2/data_brian_Josef_brianJosef

Dates:

FILE_SORT_REGEX=".*data_(.*) /root/data_222<NULL> (invalid date conversion) /root/data_2020-12-312020-12-31 /root/dir/data_2021-01-012021-01-01

Import:

COPY table_1 from ".../" WITH (FILE_SORT_ORDER_BY="regex", FILE_SORT_REGEX=".*file_(.)");

Geo and Raster File Globbing

Limited filename globbing is supported for both geo and raster import. For example, to import a sequence of same-format GeoTIFF files into a single table, you can run the following:

COPY table FROM '/path/path/something_*.tiff' WITH (source_type='raster_file')

The files are imported in alphanumeric sort order, per regular glob rules, and all appended to the same table. This may fail if the files are not all of the same format (band count, names, and types).

For non-geo/raster files (CSV and Parquet), you can provide just the path to the directory OR a wildcard; for example:

/path/to/directory/ /path/to/directory /path/to/directory/*

For geo/raster files, a wildcard is required, as shown in the last example.

SQLImporter

SQLImporter is a Java utility run at the command line. It runs a SELECT statement on another database through JDBC and loads the result set into HeavyDB.

Usage

java -cp [HEAVY.AI utility jar file]:[3rd party JDBC driver]
SQLImporter
-u <userid>; -p <password>; [(--binary|--http|--https [--insecure])]
-s <heavyai server host> -db <omnsci db> --port <heavyai server port>
[-d <other database JDBC drive class>] -c <other database JDBC connection string>
-su <other database user> -sp <other database user password> -ss <other database sql statement>
-t <HEAVY.AI target table> -b <transfer buffer size> -f <table fragment size>
[-tr] [-nprg] [-adtf] [-nlj] -i <init commands file>

Flags

-r                                     Row load limit 
-h,--help                              Help message
-r <arg>;                              Row load limit 
-h,--help                              Help message 
-u,--user <arg>;                       HEAVY.AI user 
-p,--passwd <arg>;                     HEAVY.AI password 
--binary                               Use binary transport to connect to HEAVY.AI 
--http                                 Use http transport to connect to HEAVY.AI 
--https                                Use https transport to connect to HEAVY.AI 
-s,--server <arg>;                     HEAVY.AI Server 
-db,--database <arg>;                  HEAVY.AI Database 
--port <arg>;                          HEAVY.AI Port 
--ca-trust-store <arg>;                CA certificate trust store 
--ca-trust-store-passwd <arg>;         CA certificate trust store password 
--insecure <arg>;                      Insecure TLS - Do not validate server HEAVY.AI 
                                       server certificates 
-d,--driver <arg>;                     JDBC driver class 
-c,--jdbcConnect <arg>;                JDBC connection string 
-su,--sourceUser <arg>;                Source user 
-sp,--sourcePasswd <arg>;              Source password 
-ss,--sqlStmt <arg>;                   SQL Select statement 
-t,--targetTable <arg>;                HEAVY.AI Target Table 
-b,--bufferSize <arg>;                 Transfer buffer size 
-f,--fragmentSize <arg>;               Table fragment size 
-tr,--truncate                         Truncate table if it exists 
-nprg,--noPolyRenderGroups             Disable render group assignment  
-adtf,--allowDoubleToFloat             Allow narrow casting
-nlj,--no-log-jdbc-connection-string   Omit JDBC connection string from logs   
-i,--initializeFile <arg>;             File containing init command for DB

HEAVY.AI recommends that you use a service account with read-only permissions when accessing data from a remote database.

In release 4.6 and higher, the user ID (-u) and password (-p) flags are required. If your password includes a special character, you must escape the character using a backslash (\).

If the table does not exist in HeavyDB, SQLImporter creates it. If the target table in HeavyDB does not match the SELECT statement metadata, SQLImporter fails.

If the truncate flag is used, SQLImporter truncates the table in HeavyDB before transferring the data. If the truncate flag is not used, SQLImporter appends the results of the SQL statement to the target table in HeavyDB.

The -i argument provides a path to an initialization file. Each line of the file is sent as a SQL statement to the remote database. You can use -i to set additional custom parameters before the data is loaded.

The SQLImporter string is case-sensitive. Incorrect case returns the following:

Error: Could not find or load main class com.mapd.utility.SQLimporter

PostgreSQL/PostGIS Support

You can migrate geo data types from a PostgreSQL database. The following table shows the correlation between PostgreSQL/PostGIS geo types and HEAVY.AI geo types.

Other PostgreSQL types, including circle, box, and path, are not supported.

HeavyDB Example

java -cp /opt/heavyai/bin/heavyai-utility-<db-version}>.jar 
com.mapd.utility.SQLImporter -u admin -p HyperInteractive -db heavyai --port 6274 
-t mytable -su admin -sp HyperInteractive -c "jdbc:heavyai:myhost:6274:heavyai" 
-ss "select * from mytable limit 1000000000"

By default, 100,000 records are selected from HeavyDB. To select a larger number of records, use the LIMIT statement.

Hive Example

java -cp /opt/heavyai/bin/heavyai-utility-<db-version>.jar:/hive-jdbc-1.2.1000.2.6.1.0-129-standalone.jar
com.mapd.utility.SQLImporter
-u user -p password
-db Heavyai_database_name --port 6274 -t Heavyai_table_name
-su source_user -sp source_password
-c "jdbc:hive2://server_address:port_number/database_name"
-ss "select * from source_table_name"

Google Big Query Example

java -cp /opt/heavyai/bin/heavyai-utility-<db-version>.jar:./GoogleBigQueryJDBC42.jar:
./google-oauth-client-1.22.0.jar:./google-http-client-jackson2-1.22.0.jar:./google-http-client-1.22.0.jar:./google-api-client-1.22.0.jar:
./google-api-services-bigquery-v2-rev355-1.22.0.jar 
com.mapd.utility.SQLImporter
-d com.simba.googlebigquery.jdbc42.Driver 
-u user -p password
-db Heavyai_database_name --port 6274 -t Heavyai_table_name
-su source_user -sp source_password 
-c "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=project-id;OAuthType=0;
OAuthServiceAcctEmail==email@domain.iam.gserviceaccount.com;OAuthPvtKeyPath=/home/simba/myproject.json;"
-ss "select * from schema.source_table_name"

PostgreSQL Example

java -cp /opt/heavyai/bin/heavyai-utility-<db-version>.jar:/tmp/postgresql-42.2.5.jar 
com.mapd.utility.SQLImporter 
-u user -p password
-db Heavyai_database_name --port 6274 -t Heavyai_table_name
-su source_user -sp source_password 
-c "jdbc:postgresql://127.0.0.1/postgres"
-ss "select * from schema_name.source_table_name"

SQLServer Example

java -cp /opt/heavyai/bin/heavyai-utility-<db-version>.jar:/path/sqljdbc4.jar
com.mapd.utility.SQLImporter
-d com.microsoft.sqlserver.jdbc.SQLServerDriver 
-u user -p password
-db Heavyai_database_name --port 6274 -t Heavyai_table_name
-su source_user -sp source_password 
-c "jdbc:sqlserver://server:port;DatabaseName=database_name"
-ss "select top 10 * from dbo.source_table_name"

MySQL Example

java -cp /opt/heavyai/bin/heavyai-utility-<db-version>.jar:mysql/mysql-connector-java-5.1.38-bin.jar
com.mapd.utility.SQLImporter 
-u user -p password
-db Heavyai_database_name --port 6274 -t Heavyai_table_name
-su source_user -sp source_password 
-c "jdbc:mysql://server:port/database_name"
-ss "select * from schema_name.source_table_name"

StreamInsert

Stream data into HeavyDB by attaching the StreamInsert program to the end of a data stream. The data stream can be another program printing to standard out, a Kafka endpoint, or any other real-time stream output. You can specify the appropriate batch size, according to the expected stream rates and your insert frequency. The target table must exist before you attempt to stream data into the table.

<data stream> | StreamInsert <table name> <database name> \
{-u|--user} <user> {-p|--passwd} <password> [{--host} <hostname>] \
[--port <port number>][--delim <delimiter>][--null <null string>] \
[--line <line delimiter>][--batch <batch size>][{-t|--transform} \
transformation ...][--retry_count <num_of_retries>] \
[--retry_wait <wait in secs>][--print_error][--print_transform]

For more information on creating regex transformation statements, see RegEx Replace.

Example

cat file.tsv | /path/to/heavyai/SampleCode/StreamInsert stream_example \
heavyai --host localhost --port 6274 -u imauser -p imapassword \
--delim '\t' --batch 1000

Importing AWS S3 Files

You can use the SQL COPY FROM statement to import files stored on Amazon Web Services Simple Storage Service (AWS S3) into an HEAVY.AI table, in much the same way you would with local files. In the WITH clause, specify the S3 credentials and region information of the bucket accessed.

COPY <table> FROM '<S3_file_URL>' WITH ([[s3_access_key = '<key_name>',s3_secret_key = '<key_secret>',] | [s3_session_token - '<AWS_session_token']] s3_region = '<region>');

Access key and secret key, or session token if using temporary credentials, and region are required. For information about AWS S3 credentials, see https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys.

HEAVY.AI does not support the use of asterisks (*) in URL strings to import items. To import multiple files, pass in an S3 path instead of a file name, and COPY FROM imports all items in that path and any subpath.

Custom S3 Endpoints

HEAVY.AI supports custom S3 endpoints, which allows you to import data from S3-compatible services, such as Google Cloud Storage.

To use custom S3 endpoints, add s3_endpoint to the WITH clause of a COPY FROM statement; for example, to set the S3 endpoint to point to Google Cloud Services:

COPY trips FROM 's3://heavyai-importtest-data/trip-data/trip_data_9.gz' WITH (header='true', s3_endpoint='storage.googleapis.com');

For information about interoperability and setup for Google Cloud Services, see Cloud Storage Interoperability.

You can also configure custom S3 endpoints by passing the s3_endpoint field to Thrift import_table.

Examples

The following examples show failed and successful attempts to copy the table trips from AWS S3.

heavysql> COPY trips FROM 's3://heavyai-s3-no-access/trip_data_9.gz';
Exception: failed to list objects of s3 url 's3://heavyai-s3-no-access/trip_data_9.gz': AccessDenied: Access Denied
heavysql> COPY trips FROM 's3://heavyai-s3-no-access/trip_data_9.gz' with (s3_access_key='xxxxxxxxxx',s3_secret_key='yyyyyyyyy');
Exception: failed to list objects of s3 url 's3://heavyai-s3-no-access/trip_data_9.gz': AuthorizationHeaderMalformed: Unable to parse ExceptionName: AuthorizationHeaderMalformed Message: The authorization header is malformed; the region 'us-east-1' is wrong; expecting 'us-west-1'
heavysql> COPY trips FROM 's3://heavyai-testdata/trip.compressed/trip_data_9.csv' with (s3_access_key='xxxxxxxx',s3_secret_key='yyyyyyyy',s3_region='us-west-1');
Result
Loaded: 100 recs, Rejected: 0 recs in 0.361000 secs

The following example imports all the files in the trip.compressed directory.

heavysql> copy trips from 's3://heavyai-testdata/trip.compressed/' with (s3_access_key='xxxxxxxx',s3_secret_key='yyyyyyyy',s3_region='us-west-1');
Result
Loaded: 105200 recs, Rejected: 0 recs in 1.890000 secs

trips Table

The table trips is created with the following statement:

heavysql> \d trips
        CREATE TABLE trips (
        medallion TEXT ENCODING DICT(32),
        hack_license TEXT ENCODING DICT(32),
        vendor_id TEXT ENCODING DICT(32),
        rate_code_id SMALLINT,
        store_and_fwd_flag TEXT ENCODING DICT(32),
        pickup_datetime TIMESTAMP,
        dropoff_datetime TIMESTAMP,
        passenger_count SMALLINT,
        trip_time_in_secs INTEGER,
        trip_distance DECIMAL(14,2),
        pickup_longitude DECIMAL(14,2),
        pickup_latitude DECIMAL(14,2),
        dropoff_longitude DECIMAL(14,2),
        dropoff_latitude DECIMAL(14,2))
WITH (FRAGMENT_SIZE = 75000000);

Using Server Privileges to Access AWS S3

You can configure HEAVY.AI server to provide AWS credentials, which allows S3 Queries to be run without specifying AWS credentials. S3 Regions are not configured by the server, and will need to be passed in either as a client side environment variable or as an option with the request.

Example Commands

  • \detect: $ export AWS_REGION=us-west-1 heavysql > \detect <s3-bucket-uri

  • import_table: $ ./Heavyai-remote -h localhost:6274 import_table "'<session-id>'" "<table-name>" '<s3-bucket-uri>' 'TCopyParams(s3_region="'us-west-1'")'

  • COPY FROM: heavysql > COPY <table-name> FROM <s3-bucket-uri> WITH(s3_region='us-west-1');

Configuring AWS Credentials

  1. Enable server privileges in the server configuration file heavy.conf allow-s3-server-privileges = true

  2. For bare metal installations set the following environment variables and restart the HeavyDB service: AWS_ACCESS_KEY_ID=xxx AWS_SECRET_ACCESS_KEY=xxx AWS_SESSION_TOKEN=xxx (required only for AWS STS credentials)

  3. For HeavyDB docker images, start a new container mounted with the configuration file using the option: -v <dirname-containing-heavy.conf>:/var/lib/heavyai and set the following environment options: -e AWS_ACCESS_KEY_ID=xxx -e AWS_SECRET_ACCESS_KEY=xxx -e AWS_SESSION_TOKEN=xxx (required only for AWS STS credentials)

KafkaImporter

You can ingest data from an existing Kafka producer to an existing table in HEAVY.AI using KafkaImporter on the command line:

KafkaImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] \
[--port <HeavyDB_port>] [--http] [--https] [--skip-verify] \
[--ca-cert <path>] [--delim <delimiter>] [--batch <batch_size>] \
[{-t|--transform} transformation ...] [retry_count <retry_number>] \
[--retry_wait <delay_in_seconds>] --null <null_value_string> [--quoted true|false] \
[--line <line_delimiter>] --brokers=<broker_name:broker_port> \ 
--group-id=<kafka_group_id> --topic=<topic_type> [--print_error] [--print_transform]

KafkaImporter requires a functioning Kafka cluster. See the Kafka website and the Confluent schema registry documentation.

KafkaImporter Options

KafkaImporter Logging Options

Configure KafkaImporter to use your target table. KafkaImporter listens to a pre-defined Kafka topic associated with your table. You must create the table before using the KafkaImporter utility. For example, you might have a table named customer_site_visit_events that listens to a topic named customer_site_visit_events_topic.

The data format must be a record-level format supported by HEAVY.AI.

KafkaImporter listens to the topic, validates records against the target schema, and ingests topic batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism. You can start, shut down, and configure KafkaImporter independent of the HeavyDB engine. If KafkaImporter is running and the database shuts down, KafkaImporter shuts down as well. Reads from the topic are nondestructive.

KafkaImporter is not responsible for event ordering; a streaming platform outside HEAVY.AI (for example, Spark streaming, flink) should handle the stream processing. HEAVY.AI ingests the end-state stream of post-processed events.

KafkaImporter does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis. There is a 1:1 correspondence between target table and topic.

cat tweets.tsv | -./KafkaImporter tweets_small heavyai-u imauser-p imapassword--delim '\t'--batch 100000--retry_count 360--retry_wait 10--null null--port 9999--brokers=localhost:9092--group-id=testImport1--topic=tweet
cat tweets.tsv | ./KafkaImporter tweets_small heavyai
-u imauser
-p imapassword
--delim '\t'
--batch 100000
--retry_count 360
--retry_wait 10
--null null
--port 9999
--brokers=localhost:9092
--group-id=testImport1
--topic=tweet

StreamImporter

StreamImporter is an updated version of the StreamInsert utility used for streaming reads from delimited files into HeavyDB. StreamImporter uses a binary columnar load path, providing improved performance compared to StreamInsert.

You can ingest data from a data stream to an existing table in HEAVY.AI using StreamImporter on the command line.

StreamImporter <table_name> <database_name> {-u|--user <user_name> \
{-p|--passwd <user_password>} [{--host} <hostname>] [--port <HeavyDB_port>] \
[--http] [--https] [--skipverify] [--ca-cert <path>] [--delim <delimiter>] \
[--null <null string>] [--line <line delimiter>]  [--quoted <boolean>] \
 [--batch <batch_size>] [{-t|--transform} transformation ...] \
[retry_count <number_of_retries>] [--retry_wait <delay_in_seconds>]  \
[--print_error] [--print_transform]

StreamImporter Options

StreamImporter Logging Options

Configure StreamImporter to use your target table. StreamImporter listens to a pre-defined data stream associated with your table. You must create the table before using the StreamImporter utility.

The data format must be a record-level format supported by HEAVY.AI.

StreamImporter listens to the stream, validates records against the target schema, and ingests batches of your designated size to the target table. Rejected records use the existing reject reporting mechanism. You can start, shut down, and configure StreamImporter independent of the HeavyDB engine. If StreamImporter is running but the database shuts down, StreamImporter shuts down as well. Reads from the stream are non-destructive.

StreamImporter is not responsible for event ordering - a first class streaming platform outside HEAVY.AI (for example, Spark streaming, flink) should handle the stream processing. HEAVY.AI ingests the end-state stream of post-processed events.

StreamImporter does not handle dynamic schema creation on first ingest, but must be configured with a specific target table (and its schema) as the basis.

There is a 1:1 correspondence between target table and a stream record.

cat tweets.tsv | ./StreamImporter tweets_small heavyai
-u imauser
-p imapassword
--delim '\t'
--batch 100000
--retry_count 360
--retry_wait 10
--null null
--port 9999

Importing Data from HDFS with Sqoop

You can consume a CSV or Parquet file residing in HDFS (Hadoop Distributed File System) into HeavyDB.

Copy the HEAVY.AI JDBC driver into the Apache Sqoop library, normally found at /usr/lib/sqoop/lib/.

Example

The following is a straightforward import command. For more information on options and parameters for using Apache Sqoop, see the user guide at sqoop.apache.org.

sqoop-export --table iAmATable \
--export-dir /user/cloudera/ \
--connect "jdbc:heavyai:000.000.000.0:6274:heavyai" \
--driver com.heavyai.jdbc.HeavyaiDriver \
--username imauser \
--password imapassword \
--direct \
--batch

The --connect parameter is the address of a valid JDBC port on your HEAVY.AI instance.

Troubleshooting: Avoiding Duplicate Rows

To detect duplication prior to loading data into HeavyDB, you can perform the following steps. For this example, the files are labeled A,B,C...Z.

  1. Load file A into table MYTABLE.

  2. Run the following query.

    select count(t1.uniqueCol) as dups from MYTABLE t1 join MYTABLE t2 on t1.uCol = t2.uCol;

    There should be no rows returned; if rows are returned, your first A file is not unique.

  3. Load file B into table TEMPTABLE.

  4. Run the following query.

    select count(t1.uniqueCol) as dups from MYTABLE t1 join MYTABLE t2 on t1.uCol = t2.uCol;

    There should be no rows returned if file B is unique. Fix B if the information is not unique using details from the selection.

  5. Load the fixed B file into MYFILE.

  6. Drop table TEMPTABLE.

  7. Repeat steps 3-6 for the rest of the set for each file prior to loading the data to the real MYTABLE instance.

Last updated