danfo.streamCsvTransformer
A pipeline transformer to stream a CSV file from local storage, transform it with a custom transformer, and write to the output stream. Only available in Node.js
danfo.streamCsvTransformer(func)
Parameters
Type
Description
inputFilePath
Function
The path to the CSV file to stream from.
transformer
Function
The transformer function to apply to each row.
Note that each row of the CSV file is passed as a DataFrame with a single row to the transformer function, and the transformer function is expected to return a transformed DataFrame.
options
object
Configuration options for the pipeline. These include:
  • outputFilePath The local file path to write the transformed CSV file to.
  • customCSVStreamWriter A custom CSV stream writer function. This is applied at the end of each transform. If not provided, a default CSV stream writer is used, and this writes to local storage.
  • inputStreamOptions Configuration options for the input stream. Supports all Papaparse CSV reader config options.
  • outputStreamOptions Configuration options for the output stream. This is only applied when using the default CSV stream writer. Supports all toCSV options.
Returns:
return A promise that resolves when the pipeline transformation is complete.
The streamCsvTransformer can be used to incrementally transform a CSV file. This is done by:
  • Streaming a CSV file from a local or remote path.
  • Passing each corresponding row as a DataFrame to the specified transformer function.
  • Writing the result to an output stream.

Stream processing a local file

In the example below, we stream a local CSV file (titanic.csv), applies a transformer function, and write the output to the titanicOutLocal file.
The transformer takes each Name column, splits the person's title, and creates a new column from it.
Node
1
import { DataFrame, Series, streamCsvTransformer } from "danfojs-node";
2
import path from "path"
3
4
const inputFilePath = path.join(process.cwd(), "raw_data", "titanic.csv");
5
const outputFilePath = path.join(process.cwd(), "raw_data", "titanicOutLocal.csv");
6
7
/**
8
* A simple function that takes a DataFrame, and transforms the Name column.
9
* */
10
const transformer = (df) => {
11
const titles = df["Name"].map((name) => name.split(".")[0]);
12
const names = df["Name"].map((name) => name.split(".")[1]);
13
df["Name"] = names
14
df.addColumn("titles", titles, { inplace: true })
15
return df
16
}
17
18
dfd.streamCsvTransformer(inputFilePath, transformer, {
19
outputFilePath,
20
inputStreamOptions: { header: false }
21
})
Copied!
Output
1
//initial head of titanic.csv before transforming
2
3
PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
4
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
5
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
6
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
7
8
9
//Head of titanicOutLocal.csv after transforming
10
11
PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,titles
12
1,0,3, Owen Harris,male,22,1,0,A/5 21171,7.25,,S,Braund, Mr
13
2,1,1, John Bradley (Florence Briggs Thayer),female,38,1,0,PC 17599,71.2833,C85,C,Cumings, Mrs
14
3,1,3, Laina,female,26,0,0,STON/O2. 3101282,7.925,,S,Heikkinen, Miss
Copied!

Stream processing of remote file

In the example below, we stream a remote CSV file (titanic.csv), applies a transformer function, and write the output to the titanicOutLocal file.
The transformer takes each Name column, splits the person's title, and creates a new column from it.
Node
1
import { DataFrame, Series, streamCsvTransformer } from "danfojs-node";
2
import path from "path"
3
4
const inputFilePath = "https://raw.githubusercontent.com/opensource9ja/danfojs/dev/danfojs-node/tests/samples/titanic.csv"
5
const outputFilePath = path.join(process.cwd(), "raw_data", "titanicOutRemote.csv");
6
7
8
/**
9
* A simple function that takes a DataFrame, and transforms the Name column.
10
* */
11
const transformer = (df) => {
12
const titles = df["Name"].map((name) => name.split(".")[0]);
13
const names = df["Name"].map((name) => name.split(".")[1]);
14
df["Name"] = names
15
df.addColumn("titles", titles, { inplace: true })
16
return df
17
}
18
19
dfd.streamCsvTransformer(inputFilePath, transformer, {
20
outputFilePath,
21
inputStreamOptions: { header: false }
22
})
Copied!

Stream processing with a custom writer

If you need custom control of the output writer, then you can provide a pipe-able custom writer. See https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93/
In the example below, we add a custom writer that logs each row. You can extend this to upload each chunk to a database, or any other function you need.
Node
1
const dfd = require('danfojs-node-nightly')
2
const path = require("path")
3
const stream = require("stream")
4
5
const inputFilePath = "https://raw.githubusercontent.com/opensource9ja/danfojs/dev/danfojs-node/tests/samples/titanic.csv"
6
7
const transformer = (df) => {
8
const titles = df["Name"].map((name) => name.split(".")[0]);
9
const names = df["Name"].map((name) => name.split(".")[1]);
10
df["Name"] = names
11
df.addColumn("titles", titles, { inplace: true })
12
return df
13
}
14
let count = 0
15
16
const customWriter = function () {
17
const csvOutputStream = new stream.Writable({ objectMode: true })
18
csvOutputStream._write = (chunk, encoding, callback) => {
19
//Do anything here. For example you can write to online storage DB
20
console.log("Chunk written: " + chunk) // Eah chunk is a row DataFrame
21
count += 1
22
callback()
23
24
}
25
return csvOutputStream
26
}
27
28
dfd.streamCsvTransformer(
29
inputFilePath,
30
transformer,
31
{
32
customCSVStreamWriter: customWriter,
33
inputStreamOptions: { header: true }
34
})
Copied!
Output
1
//Showing the last log
2
...
3
4
Chunk written:
5
╔════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╗
6
║ │ Survived │ Pclass │ Name │ Sex │ Age │ Siblings/Spouse… │ Parents/Childre… │ Fare │ titles ║
7
╟────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────╢
8
║ 884 │ 0 │ 3 │ Patrick Dooley │ male │ 32 │ 0 │ 0 │ 7.75 │ Mr ║
9
╚════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╝
Copied!
Last modified 13d ago