Local and Cloud#
[1]:
from pathlib_mate import Path
import polars as pl
from s3pathlib import S3Path, context
from boto_session_manager import BotoSesManager
dir_here = Path.cwd().absolute()
bsm = BotoSesManager(profile_name="bmt_app_dev_us_east_1")
context.attach_boto_session(boto_ses=bsm.boto_ses)
bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
s3dir_root = S3Path(
f"s3://{bucket}/projects/learn_polars"
f"/input-and-output/local-and-cloud/"
).to_dir()
[2]:
n_rows = 1000
df = pl.DataFrame(
{
"id": list(range(1, 1 + n_rows)),
"name": ["Alice"] * n_rows,
"details": [{"dob": "2000-01-01"}] * n_rows
}
)
df
[2]:
| id | name | details |
|---|---|---|
| i64 | str | struct[1] |
| 1 | "Alice" | {"2000-01-01"} |
| 2 | "Alice" | {"2000-01-01"} |
| 3 | "Alice" | {"2000-01-01"} |
| 4 | "Alice" | {"2000-01-01"} |
| 5 | "Alice" | {"2000-01-01"} |
| … | … | … |
| 996 | "Alice" | {"2000-01-01"} |
| 997 | "Alice" | {"2000-01-01"} |
| 998 | "Alice" | {"2000-01-01"} |
| 999 | "Alice" | {"2000-01-01"} |
| 1000 | "Alice" | {"2000-01-01"} |
Local File IO#
[3]:
def clean_up_local():
for f in dir_here.glob("*.json"):
f.unlink()
Write to Local File#
额外依赖
polars 无需任何额外依赖就可以对本地文件进行读写.
节约内存
在很多 IO 场景中都有用数据流, 而不是一次性读取全部数据的方式节约内存. 这种方式通常是用牺牲性能换取更小的内存开销. 典型的使用场景如下:
逐行读取大文件, 逐行对数据进行处理, 并将处理后的数据逐行写入新文件.
当你大的 DataFrame 写入本地文件时, Python 默认的 IO 就是流式写入, 所以你无需担心内存问题.
参数说明
polars.write_parquet 方法接受:
字符串: 字符串将会被视为本地文件的路径.
File-liked Object: 你可以用 File-liked Object 作为输入也可以.
[4]:
clean_up_local()
# Write to Local File
path = dir_here / "data.json"
df.write_ndjson(str(path))
print(f"{path.exists() = }, {path.size_in_text = }")
path.exists() = True, path.size_in_text = '55.56 KB'
Read from Local File#
[5]:
# Read from Local File
pl.read_ndjson(str(path))
[5]:
| id | name | details |
|---|---|---|
| i64 | str | struct[1] |
| 1 | "Alice" | {"2000-01-01"} |
| 2 | "Alice" | {"2000-01-01"} |
| 3 | "Alice" | {"2000-01-01"} |
| 4 | "Alice" | {"2000-01-01"} |
| 5 | "Alice" | {"2000-01-01"} |
| … | … | … |
| 996 | "Alice" | {"2000-01-01"} |
| 997 | "Alice" | {"2000-01-01"} |
| 998 | "Alice" | {"2000-01-01"} |
| 999 | "Alice" | {"2000-01-01"} |
| 1000 | "Alice" | {"2000-01-01"} |
[6]:
with path.open("rb") as f:
df = pl.read_ndjson(f)
df
[6]:
| id | name | details |
|---|---|---|
| i64 | str | struct[1] |
| 1 | "Alice" | {"2000-01-01"} |
| 2 | "Alice" | {"2000-01-01"} |
| 3 | "Alice" | {"2000-01-01"} |
| 4 | "Alice" | {"2000-01-01"} |
| 5 | "Alice" | {"2000-01-01"} |
| … | … | … |
| 996 | "Alice" | {"2000-01-01"} |
| 997 | "Alice" | {"2000-01-01"} |
| 998 | "Alice" | {"2000-01-01"} |
| 999 | "Alice" | {"2000-01-01"} |
| 1000 | "Alice" | {"2000-01-01"} |
Read from Multiple Files#
This example shows how to work with multiple files.
polars doesn’t support automatically write big dataframe into multiple small files. You have to divide the dataframe into small ones and write them one by one.
If you really want to do that, you can check pyarrow.dataset.write_dataset
Polars lazy API has a set of functions polars.scan_${data_format} to read data from multiple files.
[7]:
clean_up_local()
# Write to Local File
for i in range(1, 1+3):
path = dir_here / f"data{i}.json"
df.write_ndjson(str(path))
# Read from Local File
pl.scan_ndjson(f"{dir_here}/*.json").collect()
[7]:
| id | name | details |
|---|---|---|
| i64 | str | struct[1] |
| 1 | "Alice" | {"2000-01-01"} |
| 2 | "Alice" | {"2000-01-01"} |
| 3 | "Alice" | {"2000-01-01"} |
| 4 | "Alice" | {"2000-01-01"} |
| 5 | "Alice" | {"2000-01-01"} |
| … | … | … |
| 996 | "Alice" | {"2000-01-01"} |
| 997 | "Alice" | {"2000-01-01"} |
| 998 | "Alice" | {"2000-01-01"} |
| 999 | "Alice" | {"2000-01-01"} |
| 1000 | "Alice" | {"2000-01-01"} |
Cloud Storage IO#
对于 Write 操作, 如果你要用 Cloud storage 官方文档中的 API, 那么 polars 依赖于
fsspec s3fs (for AWS) adlfs (for Azure) gcsfs (for GCP)这些额外的依赖来提供 IO 层的抽象以及异步 API 调用. 但如果你愿意手动用 io 库先将 binary 数据写入 buffer, 然后再用各个 Cloud 原生 API 将数据写入, 又或是先写入到本地文件后再用各个 Cloud 原生 API 上传, 那么你可以不依赖于 fsspec 系列库.对于 Read 操作, polars 不依赖
fsspec系列库 (也就间接不依赖于aiobotocore等库). 例如 read_parquet, scan_parquet (其他文件格式同理) 都有storage_options这个参数可供你控制 credentials, 以及底层的 API 调用.
我其实不理解为什么 Write 操作不能像 Read 一样, 不依赖任何除了 Cloud 原生 SDK 以外的任何库, 可能会在新版本加入这一支持吧.
Reference:
[8]:
def clean_up_s3():
s3dir_root.delete()
Write to AWS S3#
[9]:
s3path = s3dir_root / "data.json"
Method 1, use io.BytesIO
[10]:
clean_up_s3()
import io
buffer = io.BytesIO()
df.write_ndjson(buffer)
print(f"Write to {s3path.uri}")
s3path.write_bytes(buffer.getvalue())
# Read from S3
pl.read_ndjson(s3path.uri)
Write to s3://bmt-app-dev-us-east-1-data/projects/learn_polars/input-and-output/local-and-cloud/data.json
[10]:
| id | name | details |
|---|---|---|
| i64 | str | struct[1] |
| 1 | "Alice" | {"2000-01-01"} |
| 2 | "Alice" | {"2000-01-01"} |
| 3 | "Alice" | {"2000-01-01"} |
| 4 | "Alice" | {"2000-01-01"} |
| 5 | "Alice" | {"2000-01-01"} |
| … | … | … |
| 996 | "Alice" | {"2000-01-01"} |
| 997 | "Alice" | {"2000-01-01"} |
| 998 | "Alice" | {"2000-01-01"} |
| 999 | "Alice" | {"2000-01-01"} |
| 1000 | "Alice" | {"2000-01-01"} |
Method 2, write to local file then upload
[11]:
clean_up_s3()
print(f"Write to {path.basename!r}")
df.write_ndjson(str(path))
print(f"Upload to {s3path.uri}")
bsm.s3_client.upload_file(str(path), Bucket=bucket, Key=s3path.key)
# Read from S3
pl.read_ndjson(s3path.uri)
Write to 'data3.json'
Upload to s3://bmt-app-dev-us-east-1-data/projects/learn_polars/input-and-output/local-and-cloud/data.json
[11]:
| id | name | details |
|---|---|---|
| i64 | str | struct[1] |
| 1 | "Alice" | {"2000-01-01"} |
| 2 | "Alice" | {"2000-01-01"} |
| 3 | "Alice" | {"2000-01-01"} |
| 4 | "Alice" | {"2000-01-01"} |
| 5 | "Alice" | {"2000-01-01"} |
| … | … | … |
| 996 | "Alice" | {"2000-01-01"} |
| 997 | "Alice" | {"2000-01-01"} |
| 998 | "Alice" | {"2000-01-01"} |
| 999 | "Alice" | {"2000-01-01"} |
| 1000 | "Alice" | {"2000-01-01"} |
Read from AWS S3#
[38]:
[ ]: