Category Archives: Kettle

Pentaho Data Integration

Format date in Batch yyyyMMdd

The usual approach to build a timestamp for a log files in a batch is to use standard Windows utilities date and time. However the format of the output depends on locale and it is almost not possible to make the script which runs on any machine. A solution might be to create a small java tool.

Source file (datetime.java):

import java.text.SimpleDateFormat;
import java.util.Calendar;
public class datetime {
  public static void main(String[] args) {
    System.out.println(new SimpleDateFormat("yyyyMMdd_HHmmss")
          .format(Calendar.getInstance().getTime()));
  }
}

Compilation (you need JDK for this):

javac datatime.java

The code will be compiled into datatime.class.

Use in a batch file:

java datetime>logdate.txt
set /p STAMP=<logdate.txt
set LOG=C:\logs\%STAMP%.log
call Main.bat > %LOG%

Archiving Pentaho log table in SQL Server

Pentaho Data Integration displays only the last 50 log entries but anytime you open a transformation it has to read the whole log table. When the log table gets large, opening a transformation may take long time and PDI freezes for this time.

A solution is to archive old entries from the log table.

Create an archive table ETL_LOG_ARCHIVE with the structure identical to the structure of the log table ETL_LOG:

SELECT * INTO ETL_LOG_ARCHIVE FROM ETL_LOG WHERE 0=1

Move old rows from ETL_LOG to ETL_LOG_ARCHIVE:

DELETE FROM ETL_LOG
OUTPUT DELETED.* INTO ETL_LOG_ARCHIVE
WHERE ID_BATCH IN (
   SELECT ID_BATCH FROM 
   (
      SELECT 
         ID_BATCH, 
         ROW_NUMBER() OVER (
            PARTITION BY TRANSNAME 
            ORDER BY ID_BATCH DESC) RN 
         FROM ETL_LOG
   ) A
   WHERE RN > 50
) 
AND ID_BATCH NOT IN (
   SELECT MAX(ID_BATCH) 
   FROM ETL_LOG
   WHERE STATUS='end' 
   GROUP BY TRANSNAME
)

We want to keep the last 50 rows for each transformation. But also we need to keep the last successful execution for each transformation. Otherwise there might be a problem with incremental load if a transformation has been failing more than 50 times.

Removing time part of a date fails in Kettle

Might be an interesting info.

I encountered an error running a Pentaho Data Integration transformation. After simplification I got this:

Generate Row creates one row with the date field DATETIME=1981-01-01. Calculator calculates field DATE using operation “Remove time from a date”. And this trivial transformation failed with a weird error:

Unexpected error : 
java.lang.IllegalArgumentException: MINUTE
	at java.util.GregorianCalendar.computeTime(Unknown Source)
	at java.util.Calendar.updateTime(Unknown Source)
	at java.util.Calendar.getTimeInMillis(Unknown Source)
	at java.util.Calendar.getTime(Unknown Source)
	at org.pentaho.di.core.Const.removeTimeFromDate(Const.java:1958)
	at org.pentaho.di.core.row.ValueDataUtil.removeTimeFromDate(ValueDataUtil.java:628)
	at org.pentaho.di.trans.steps.calculator.Calculator.calcFields(Calculator.java:476)
	at org.pentaho.di.trans.steps.calculator.Calculator.processRow(Calculator.java:165)
	at org.pentaho.di.trans.step.BaseStep.runStepThread(BaseStep.java:2889)
	at org.pentaho.di.trans.steps.calculator.Calculator.run(Calculator.java:626)

After some digging into Pentaho code, I have replicated the error in Java:

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;

public class Prog {

    public static void main(String[] args) throws ParseException {
        //String dateStr = "1982-01-01", timeZoneID = "Europe/Berlin";
        //String dateStr = "1981-01-01", timeZoneID = "Asia/Singapore";
        //String dateStr = "1982-01-01", timeZoneID = "Asia/Seoul";
        String dateStr = "1982-01-01", timeZoneID = "Asia/Singapore";

        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
        Date date = dateFormat.parse(dateStr);        

        Calendar calendar = Calendar.getInstance();
        calendar.setTimeZone(TimeZone.getTimeZone(timeZoneID));
        calendar.setLenient(false);
        calendar.setTime(date);
        calendar.set(Calendar.HOUR_OF_DAY, 0);
        calendar.set(Calendar.MINUTE, 0);
        calendar.set(Calendar.SECOND, 0);
        calendar.set(Calendar.MILLISECOND, 0);

        System.out.println(calendar.getTime());
    }
}

Here Asia/Singapore is the time zone setting of the Data Integration server.

The code failed with the same error:

Exception in thread "main" java.lang.IllegalArgumentException: MINUTE
	at java.util.GregorianCalendar.computeTime(GregorianCalendar.java:2482)
	at java.util.Calendar.updateTime(Calendar.java:2265)
	at java.util.Calendar.getTimeInMillis(Calendar.java:1049)
	at java.util.Calendar.getTime(Calendar.java:1022)
	at Prog.main(Prog.java:28)

Interesting is that the code works fine for very close parameters:

  • dateStr = “1982-01-01”, timeZoneID = “Europe/Berlin”
  • dateStr = “1981-01-01”, timeZoneID = “Asia/Singapore”
  • dateStr = “1982-01-01”, timeZoneID = “Asia/Seoul”

Asia/Seoul and Asia/Singapore is actually the same time zone.

The cause of the error has been explained here.

When you enter a date with no time, the time is assumed to be 12:00:00 AM.

But there was no 12:00:00 AM on January 1, 1982, in Singapore. After 11:59:59 PM on December 31, 1981, Singapore jumped ahead by half an hour to 12:30 AM. It had previously been at UTC+7:30, but moved to the whole-hour zone of UTC+8.”

How to Obfuscate Password for Command Line Kettle

Pentaho Kettle does not provide possibility to obfuscate password in batch files (though the connection passwords are obfuscated in XML files).

You can write:

kitchen.bat /rep repos /job test /user admin /pass admin

but this will not work:

kitchen.bat /rep repos /job test /user admin /pass "Encrypted 2be98afc86aa7f2e4cb79ce71da9fa6d4"

Here “Encrypted 2be98afc86aa7f2e4cb79ce71da9fa6d4” is encrypted version of “admin”. You can get such string using kettle tool encr.bat.

This post describes a workaround.

Solution

A solution is to create a java wrapper for Kitchen.

1. Create java file Kettle1.java in the Kettle root with the following code:

import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.kitchen.Kitchen;
import org.pentaho.di.core.encryption.Encr;

public class Kitchen1 {
   public static void main(String[] args) throws KettleException {
      // find argument /pass
      int i = 0;
      for (; i < args.length; ++i)
         if (args[i].equals("/pass") || args[i].equals("/password"))
            break;
      // decrypt password if necessary
      if (++i < args.length)
         args[i] = Encr.decryptPasswordOptionallyEncrypted(args[i]);
      // run Kitchen
      Kitchen.main(args);
   }
}

2. Open command line. Run kitchen.bat without any parameters. This will initialize the necessary environment variable CLASSPATH.

3. Compile the java file with the following command:

javac Kitchen1.java

4. Copy Kitchen.bat to Kitchen1.bat and correct the last line. Replace

java %OPT% org.pentaho.di.kitchen.Kitchen %_cmdline%

with

java %OPT% Kitchen1 %_cmdline%

Now you can use Kitchen1.bat instead of Kitchen.bat. Both commands will work:

kitchen1.bat /rep repos /job test /user admin /pass admin
kitchen1.bat /rep repos /job test /user admin /pass "Encrypted 2be98afc86aa7f2e4cb79ce71da9fa6d4"

5. Remove source file Kitchen1.java. Close command line.

Have fun 🙂

Handling Records Removal in Dimension Tables

The records can be removed from a table in a operational system, but we never remove records from dimension tables. This post describes how the removal of records can be handled in dimension tables.

Let’s consider an example. There is a list of contact persons in a source system. The list is loaded into a dimension table in a data warehouse. Contact persons can be removed from the operational system, but not from dimension tables. The task is to list the current contact persons based on the data from the data warehouse. What can help us to identify the current records? Continue reading

Creating Aggregate Tables

This post demonstrates an example how to create aggregation tables in Kettle.

Let’s assume the original transformation loading delta is the following:

It reads data from source database, makes some transformations and loads data into data warehouse.

The source table JobEntry has two grouping fields project_id and employee_id and one additive measure registered_hrs. The task is to update aggregate tables (by project_id, and by employee_id) without complete rebuilding them.

Before updating the fact table in data warehouse, we need to retain the current value from the fact table (step Get previous fact value). After the fact table is updated, we are updating the aggregate tables. We calculate difference between the new and old value (step Calculate change), summarize the change of value to necessary granularity (steps Sort rows, Aggregate), and add the change to the value in the aggregate table (steps Get old agg.value, Calculate new value, Insert/update agg table). The transformation may look like this:

Demo Data (SQL Server)

The source database table:

create table JobEntry (
  project_id int,
  employee_id int,
  registered_hrs numeric(22,2)
)

The script changing the table (updating existing or inserting new rows):

declare
   @project_id int = RAND() * 3,
   @employee_id int = RAND() * 3,
   @registered_hrs numeric(22,2) = RAND() * 10

declare @cnt int = (
      select COUNT(*) from JobEntry
      where employee_id = @employee_id
      and project_id = @project_id
   )

if @cnt = 0
begin
   insert JobEntry values (
      @project_id,
      @employee_id,
      @registered_hrs
   )
end
else begin
   update JobEntry set registered_hrs = @registered_hrs
   where employee_id = @employee_id
   and project_id = @project_id
end

select * from JobEntry

The transformation

The data warehouse tables:

create table f_jobentry(
  project_id int,
  employee_id int,
  registered_hrs decimal(22,2)
);
create table f_jobentry_employee (
  employee_id int,
  registered_hrs decimal(22,2)
);
create table f_jobentry_project (
  project_id int,
  registered_hrs decimal(22,2)
);

Kettle transformation

Test

  • Create the necessary tables.
  • Run the script generating data.
  • Run the transformation updating the fact table and aggregate tables.
  • Check the output tables.
  • Run the script.
  • Run the transformation.
  • Check the output tables.

Summary

This approach seems too complex. Maybe the complete rebuilding the aggregate tables is not bad…

Use of Index and Characterset

This post demonstrates influence of characterset on use of indexes. See also Language and Use of Indexes in Oracle

/*
DROP TABLE t;
ALTER SESSION SET NLS_LANGUAGE='AMERICAN';
*/

SELECT PARAMETER, VALUE FROM V$NLS_PARAMETERS;

CREATE TABLE t (x VARCHAR2(255));

INSERT INTO t VALUES ('a');
INSERT INTO t VALUES ('b');
INSERT INTO t VALUES ('c');
INSERT INTO t VALUES ('d');

CREATE INDEX t_idx ON t (x);

-- the index is used
UPDATE t SET x='a' WHERE x='a';

ALTER SESSION SET NLS_LANGUAGE='CZECH';

-- the index is still used if NLS_CHARACTERSET=WE8MSWIN1252
-- the index is not used if NLS_CHARACTERSET=WE8ISO8859P1
UPDATE t SET x='a' WHERE x='a';

Language and Use of Indexes in Oracle

Symptoms

It takes a very long time to load a data warehouse on Oracle database with Kettle when the language in the Windows Region and Language settings is different from English (United States).

Cause

  • Oracle JDBC driver used by Kettle issues command ALTER SESSION SET NLS_LANGUAGE and changes default database language.
  • Changing NLS_LANGUAGE also changes oracle parameter NLS_SORT from BINARY to the specified language.
  • When NLS_COMP is equal to ANSI and the value of NLS_SORT is not BINARY, the linguistic comparison is used for WHERE statements.
  • When the value of NLS_COMP is different from BINARY a full scan is used for VARCHAR2 fields instead of index range scan.

Resolution

Add “-Duser.country=en -Duser.language=en” to java runtime options in Kettle batch files.

Experiment

When NLS_SORT is BINARY, the index is used.

SELECT PARAMETER, VALUE FROM V$NLS_PARAMETERS
WHERE PARAMETER IN ('NLS_LANGUAGE', 'NLS_SORT', 'NLS_COMP');

UPDATE F_JOB_BUDGET_LINE SET ACTUAL_COST=0
WHERE INSTANCE_KEY='JobBudgetLine00000000000004835665';


When the  parameter NLS_LANGUAGE is changed, NLS_SORT is also changed. The index is not used when NLS_SORT=CZECH and NLS_COMP=ANSI.

ALTER SESSION SET NLS_LANGUAGE='CZECH';
SELECT PARAMETER, VALUE FROM V$NLS_PARAMETERS
WHERE PARAMETER IN ('NLS_LANGUAGE', 'NLS_SORT');

UPDATE F_JOB_BUDGET_LINE SET ACTUAL_COST=0
WHERE INSTANCE_KEY='JobBudgetLine00000000000004835665';


We can change NLS_SORT to BINARY to make the index used again. Alternatively we can set NLS_COMP to BINARY, this will have the same effect.

ALTER SESSION SET NLS_SORT='BINARY';
SELECT PARAMETER, VALUE FROM V$NLS_PARAMETERS
WHERE PARAMETER IN ('NLS_LANGUAGE', 'NLS_SORT');

UPDATE F_JOB_BUDGET_LINE SET ACTUAL_COST=0
WHERE INSTANCE_KEY='JobBudgetLine00000000000004835665';

Io exception: Socket read timed out

If you get the following message, check the firewall settings.

ERROR 30-09 09:13:36,485 - YOUR_LOG_CONNECTION - Error disconnecting from database:
Error comitting connection
 Io exception: Socket read timed out

When the transformation/job is started, Kettle opens a connection for logging. The transformation could run for hours, the connection for logging will stay idle all this time and it might be dropped by the firewall. When the transformation finishes execution, stopped by the user or fails with an error and Kettle tries to update the log, it notices that the connection is dropped and displays the error messages.
Continue reading

Influence of Nr of rows in rowset on Merge Join

Let’s consider a transformation that merges two flows:

Here are some experiments with different Nr of rows in rowset:

So the speed of the Merge join depends on the parameter Nr of rows in rowset. It should be reasonably high (3000K).

Note that if the parameter is too high, the transformation might fail with exception: java.lang.OutOfMemoryError: Java heap space.