# danfo.streamCsvTransformer

danfo.**streamCsvTransformer**(func)

| Parameters    | Type     | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| ------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| inputFilePath | Function | The path to the CSV file to stream from.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| transformer   | Function | <p>The transformer function to apply to each row.</p><p>Note that each row of the CSV file is passed as a DataFrame with a single row to the transformer function, and the transformer function is expected to return a transformed DataFrame.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| options       | object   | <p>Configuration options for the pipeline. These include:</p><ul><li><code>outputFilePath</code> The local file path to write the transformed CSV file to.</li><li><code>customCSVStreamWriter</code> A custom CSV stream writer function. This is applied at the end of each transform. If not provided, a default CSV stream writer is used, and this writes to local storage.</li><li><code>inputStreamOptions</code> Configuration options for the input stream. Supports all Papaparse CSV reader config options.</li><li><code>outputStreamOptions</code> Configuration options for the output stream. This is only applied when using the default CSV stream writer. Supports all <code>toCSV</code> options.</li></ul> |

**Returns:**

> A promise that resolves when the pipeline transformation is complete.

The streamCsvTransformer can be used to [incrementally transform](https://en.wikipedia.org/wiki/Stream_processing) a CSV file. This is done by:

* Streaming a CSV file from a local or **remote** path.
* Passing each corresponding row as a DataFrame to the specified transformer function.
* Writing the result to an output stream.

## **Stream processing a local file**

In the example below, we stream a local CSV file (titanic.csv), apply a transformer function, and write the output to **`titanicOutLocal.csv`**.

The transformer takes each `Name` column, splits the person's title, and creates a new column from it.

{% tabs %}
{% tab title="Node" %}

```javascript
import { DataFrame, Series, streamCsvTransformer } from "danfojs-node";
import path from "path"

const inputFilePath = path.join(process.cwd(), "raw_data", "titanic.csv");
const outputFilePath = path.join(process.cwd(), "raw_data", "titanicOutLocal.csv");

/**
 * A simple function that takes a DataFrame, and transforms the Name column.
* */
const transformer = (df) => {
    const titles = df["Name"].map((name) => name.split(".")[0]);
    const names = df["Name"].map((name) => name.split(".")[1]);
    df["Name"] = names
    df.addColumn("titles", titles, { inplace: true })
    return df
}

dfd.streamCsvTransformer(inputFilePath, transformer, {
    outputFilePath,
    inputStreamOptions: { header: false }
})
```

{% endtab %}
{% endtabs %}

{% tabs %}
{% tab title="Output" %}

```
//initial head of titanic.csv before transforming

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S


//Head of titanicOutLocal.csv after transforming

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,titles
1,0,3, Owen Harris,male,22,1,0,A/5 21171,7.25,,S,Braund, Mr
2,1,1, John Bradley (Florence Briggs Thayer),female,38,1,0,PC 17599,71.2833,C85,C,Cumings, Mrs
3,1,3, Laina,female,26,0,0,STON/O2. 3101282,7.925,,S,Heikkinen, Miss
```

{% endtab %}
{% endtabs %}

## **Stream processing of remote file**

In the example below, we stream a remote CSV file (titanic.csv), applies a transformer function, and write the output to the `titanicOutLocal` file.

The transformer takes each `Name` column, splits the person's title, and creates a new column from it.

{% tabs %}
{% tab title="Node" %}

```javascript
import { DataFrame, Series, streamCsvTransformer } from "danfojs-node";
import path from "path"

const inputFilePath = "https://raw.githubusercontent.com/opensource9ja/danfojs/dev/danfojs-node/tests/samples/titanic.csv"
const outputFilePath = path.join(process.cwd(), "raw_data", "titanicOutRemote.csv");


/**
 * A simple function that takes a DataFrame, and transforms the Name column.
* */
const transformer = (df) => {
    const titles = df["Name"].map((name) => name.split(".")[0]);
    const names = df["Name"].map((name) => name.split(".")[1]);
    df["Name"] = names
    df.addColumn("titles", titles, { inplace: true })
    return df
}

dfd.streamCsvTransformer(inputFilePath, transformer, {
    outputFilePath,
    inputStreamOptions: { header: false }
})
```

{% endtab %}
{% endtabs %}

## **Stream processing with a custom writer**

If you need custom control of the output writer, then you can provide a pipe-able custom writer. See <https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93/>

In the example below, we add a custom writer that logs each row. You can extend this to upload each chunk to a database, or any other function you need.

{% tabs %}
{% tab title="Node" %}

```javascript
const dfd = require('danfojs-node-nightly')
const path = require("path")
const stream = require("stream")

const inputFilePath = "https://raw.githubusercontent.com/opensource9ja/danfojs/dev/danfojs-node/tests/samples/titanic.csv"

const transformer = (df) => {
    const titles = df["Name"].map((name) => name.split(".")[0]);
    const names = df["Name"].map((name) => name.split(".")[1]);
    df["Name"] = names
    df.addColumn("titles", titles, { inplace: true })
    return df
}
let count = 0

const customWriter = function () {
    const csvOutputStream = new stream.Writable({ objectMode: true })
    csvOutputStream._write = (chunk, encoding, callback) => {
        //Do anything here. For example you can write to online storage DB
        console.log("Chunk written: " + chunk) // Eah chunk is a row DataFrame
        count += 1
        callback()

    }
    return csvOutputStream
}

dfd.streamCsvTransformer(
    inputFilePath,
    transformer,
    {
        customCSVStreamWriter: customWriter,
        inputStreamOptions: { header: true }
    })
```

{% endtab %}
{% endtabs %}

{% tabs %}
{% tab title="Output" %}

```
//Showing the last log
...

Chunk written: 
╔════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╤═══════════════════╗
║            │ Survived          │ Pclass            │ Name              │ Sex               │ Age               │ Siblings/Spouse…  │ Parents/Childre…  │ Fare              │ titles            ║
╟────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────┼───────────────────╢
║ 884        │ 0                 │ 3                 │  Patrick Dooley   │ male              │ 32                │ 0                 │ 0                 │ 7.75              │ Mr                ║
╚════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╧═══════════════════╝
```

{% endtab %}
{% endtabs %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://danfo.jsdata.org/api-reference/general-functions/danfo.streamcsvtransformer.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
