Local and Cloud#

[1]:
from pathlib_mate import Path

import polars as pl
from s3pathlib import S3Path, context
from boto_session_manager import BotoSesManager

dir_here = Path.cwd().absolute()

bsm = BotoSesManager(profile_name="bmt_app_dev_us_east_1")
context.attach_boto_session(boto_ses=bsm.boto_ses)
bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
s3dir_root = S3Path(
    f"s3://{bucket}/projects/learn_polars"
    f"/input-and-output/local-and-cloud/"
).to_dir()
[2]:
n_rows = 1000
df = pl.DataFrame(
    {
        "id": list(range(1, 1 + n_rows)),
        "name": ["Alice"] * n_rows,
        "details": [{"dob": "2000-01-01"}] * n_rows
    }
)
df
[2]:
shape: (1_000, 3)
idnamedetails
i64strstruct[1]
1"Alice"{"2000-01-01"}
2"Alice"{"2000-01-01"}
3"Alice"{"2000-01-01"}
4"Alice"{"2000-01-01"}
5"Alice"{"2000-01-01"}
996"Alice"{"2000-01-01"}
997"Alice"{"2000-01-01"}
998"Alice"{"2000-01-01"}
999"Alice"{"2000-01-01"}
1000"Alice"{"2000-01-01"}

Local File IO#

[3]:
def clean_up_local():
    for f in dir_here.glob("*.json"):
        f.unlink()

Write to Local File#

额外依赖

polars 无需任何额外依赖就可以对本地文件进行读写.

节约内存

在很多 IO 场景中都有用数据流, 而不是一次性读取全部数据的方式节约内存. 这种方式通常是用牺牲性能换取更小的内存开销. 典型的使用场景如下:

  • 逐行读取大文件, 逐行对数据进行处理, 并将处理后的数据逐行写入新文件.

当你大的 DataFrame 写入本地文件时, Python 默认的 IO 就是流式写入, 所以你无需担心内存问题.

参数说明

polars.write_parquet 方法接受:

  1. 字符串: 字符串将会被视为本地文件的路径.

  2. File-liked Object: 你可以用 File-liked Object 作为输入也可以.

[4]:
clean_up_local()

# Write to Local File
path = dir_here / "data.json"
df.write_ndjson(str(path))
print(f"{path.exists() = }, {path.size_in_text = }")
path.exists() = True, path.size_in_text = '55.56 KB'

Read from Local File#

[5]:
# Read from Local File
pl.read_ndjson(str(path))
[5]:
shape: (1_000, 3)
idnamedetails
i64strstruct[1]
1"Alice"{"2000-01-01"}
2"Alice"{"2000-01-01"}
3"Alice"{"2000-01-01"}
4"Alice"{"2000-01-01"}
5"Alice"{"2000-01-01"}
996"Alice"{"2000-01-01"}
997"Alice"{"2000-01-01"}
998"Alice"{"2000-01-01"}
999"Alice"{"2000-01-01"}
1000"Alice"{"2000-01-01"}
[6]:
with path.open("rb") as f:
    df = pl.read_ndjson(f)
df
[6]:
shape: (1_000, 3)
idnamedetails
i64strstruct[1]
1"Alice"{"2000-01-01"}
2"Alice"{"2000-01-01"}
3"Alice"{"2000-01-01"}
4"Alice"{"2000-01-01"}
5"Alice"{"2000-01-01"}
996"Alice"{"2000-01-01"}
997"Alice"{"2000-01-01"}
998"Alice"{"2000-01-01"}
999"Alice"{"2000-01-01"}
1000"Alice"{"2000-01-01"}

Read from Multiple Files#

This example shows how to work with multiple files.

polars doesn’t support automatically write big dataframe into multiple small files. You have to divide the dataframe into small ones and write them one by one.

If you really want to do that, you can check pyarrow.dataset.write_dataset

Polars lazy API has a set of functions polars.scan_${data_format} to read data from multiple files.

[7]:
clean_up_local()

# Write to Local File
for i in range(1, 1+3):
    path = dir_here / f"data{i}.json"
    df.write_ndjson(str(path))

# Read from Local File
pl.scan_ndjson(f"{dir_here}/*.json").collect()
[7]:
shape: (3_000, 3)
idnamedetails
i64strstruct[1]
1"Alice"{"2000-01-01"}
2"Alice"{"2000-01-01"}
3"Alice"{"2000-01-01"}
4"Alice"{"2000-01-01"}
5"Alice"{"2000-01-01"}
996"Alice"{"2000-01-01"}
997"Alice"{"2000-01-01"}
998"Alice"{"2000-01-01"}
999"Alice"{"2000-01-01"}
1000"Alice"{"2000-01-01"}

Cloud Storage IO#

  • 对于 Write 操作, 如果你要用 Cloud storage 官方文档中的 API, 那么 polars 依赖于 fsspec s3fs (for AWS) adlfs (for Azure) gcsfs (for GCP) 这些额外的依赖来提供 IO 层的抽象以及异步 API 调用. 但如果你愿意手动用 io 库先将 binary 数据写入 buffer, 然后再用各个 Cloud 原生 API 将数据写入, 又或是先写入到本地文件后再用各个 Cloud 原生 API 上传, 那么你可以不依赖于 fsspec 系列库.

  • 对于 Read 操作, polars 不依赖 fsspec 系列库 (也就间接不依赖于 aiobotocore 等库). 例如 read_parquet, scan_parquet (其他文件格式同理) 都有 storage_options 这个参数可供你控制 credentials, 以及底层的 API 调用.

我其实不理解为什么 Write 操作不能像 Read 一样, 不依赖任何除了 Cloud 原生 SDK 以外的任何库, 可能会在新版本加入这一支持吧.

Reference:

[8]:
def clean_up_s3():
    s3dir_root.delete()

Write to AWS S3#

[9]:
s3path = s3dir_root / "data.json"

Method 1, use io.BytesIO

[10]:
clean_up_s3()

import io

buffer = io.BytesIO()
df.write_ndjson(buffer)
print(f"Write to {s3path.uri}")
s3path.write_bytes(buffer.getvalue())

# Read from S3
pl.read_ndjson(s3path.uri)
Write to s3://bmt-app-dev-us-east-1-data/projects/learn_polars/input-and-output/local-and-cloud/data.json
[10]:
shape: (1_000, 3)
idnamedetails
i64strstruct[1]
1"Alice"{"2000-01-01"}
2"Alice"{"2000-01-01"}
3"Alice"{"2000-01-01"}
4"Alice"{"2000-01-01"}
5"Alice"{"2000-01-01"}
996"Alice"{"2000-01-01"}
997"Alice"{"2000-01-01"}
998"Alice"{"2000-01-01"}
999"Alice"{"2000-01-01"}
1000"Alice"{"2000-01-01"}

Method 2, write to local file then upload

[11]:
clean_up_s3()

print(f"Write to {path.basename!r}")
df.write_ndjson(str(path))
print(f"Upload to {s3path.uri}")
bsm.s3_client.upload_file(str(path), Bucket=bucket, Key=s3path.key)

# Read from S3
pl.read_ndjson(s3path.uri)
Write to 'data3.json'
Upload to s3://bmt-app-dev-us-east-1-data/projects/learn_polars/input-and-output/local-and-cloud/data.json
[11]:
shape: (1_000, 3)
idnamedetails
i64strstruct[1]
1"Alice"{"2000-01-01"}
2"Alice"{"2000-01-01"}
3"Alice"{"2000-01-01"}
4"Alice"{"2000-01-01"}
5"Alice"{"2000-01-01"}
996"Alice"{"2000-01-01"}
997"Alice"{"2000-01-01"}
998"Alice"{"2000-01-01"}
999"Alice"{"2000-01-01"}
1000"Alice"{"2000-01-01"}

Read from AWS S3#

[38]:

[ ]: