Skip to content

c108.io

File I/O utilities with progress tracking for large file operations.

StreamingFile

Bases: BufferedIOBase

A thread-safe file-like object that tracks read and write progress via callbacks.

This class extends io.BufferedIOBase to add progress tracking for file operations. It's designed to work with cloud storage APIs (like AWS S3 or Google Cloud Storage) that perform large read/write operations on file-like objects.

The class handles large operations by breaking them into smaller chunks to provide frequent progress updates while maintaining full data integrity.

Thread Safety

All read, write, and seek operations are protected by an internal lock, making this class safe for concurrent access from multiple threads. Progress counters are atomically updated to prevent race conditions.

Parameters:

Name Type Description Default
path int | str | bytes | PathLike[str] | PathLike[bytes]

Path to the file to open.

required
mode str

File mode ('r', 'rb', 'w', 'wb', etc.). Defaults to 'r'.

'r'
callback Callable[[int, int], None] | None

Function called after each chunk is transferred; not called on empty read/write operation. Signature: callback(current_bytes: int, total_bytes: int) -> None

None
chunk_size int

Size in bytes for each chunk. Defaults to 8MB. Set to 0 to use file_size (single chunk, minimal progress updates). This value often aligns with cloud provider defaults (e.g., AWS S3 multipart uploads default to 8MB). Google Cloud Storage defaults to a larger 100MB chunk size for uploads.

8 * 1024 * 1024
expected_size int | None

Expected total size in bytes for write operations. Required for accurate progress tracking in write mode.

None

Attributes:

Name Type Description
bytes_read int

Total bytes read from the file (thread-safe).

bytes_written int

Total bytes written to the file (thread-safe).

chunk_size int

Size of chunks for operations.

Raises:

Type Description
ValueError

If path is empty or invalid parameters provided.

FileNotFoundError

If the file does not exist in read mode.

PermissionError

If the file cannot be accessed due to insufficient permissions.

IsADirectoryError

If path points to a directory instead of a file.

OSError

For other OS-level errors (disk full, I/O errors, etc.).

IOError

For general I/O operation failures.

Notes

The progress reported by the callback reflects the amount of data transferred to or from the underlying client library (e.g., boto3, google-cloud-storage), not the actual network transfer progress. Cloud provider libraries often have their own internal buffering, chunking, and retry mechanisms that are not visible to this class. Therefore, the progress updates indicate how much data the library has consumed from this file-like object, which is a close proxy but not a direct measure of the upload/download to the cloud service.

Example
# Reading (e.g., uploading to cloud storage):

def progress(current, total):
    print(f"Progress: {current}/{total} bytes ({current/total*100:.1f}%)")

with StreamingFile('large_file.mp4', 'rb', callback=progress) as f:
    blob.upload_from_file(f)

# Writing (e.g., downloading from cloud storage):
# Download a 100MB file

with StreamingFile('output.mp4', 'wb', callback=progress,
                    expected_size=100*1024*1024) as f:
    blob.download_to_file(f)
Source code in c108/io.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
class StreamingFile(io.BufferedIOBase):
    """
    A thread-safe file-like object that tracks read and write progress via callbacks.

    This class extends io.BufferedIOBase to add progress tracking for file operations.
    It's designed to work with cloud storage APIs (like AWS S3 or Google Cloud Storage)
    that perform large read/write operations on file-like objects.

    The class handles large operations by breaking them into smaller chunks
    to provide frequent progress updates while maintaining full data integrity.

    Thread Safety:
        All read, write, and seek operations are protected by an internal lock,
        making this class safe for concurrent access from multiple threads.
        Progress counters are atomically updated to prevent race conditions.

    Args:
        path: Path to the file to open.
        mode: File mode ('r', 'rb', 'w', 'wb', etc.). Defaults to 'r'.
        callback: Function called after each chunk is transferred; not called on empty read/write operation.
            Signature: callback(current_bytes: int, total_bytes: int) -> None
        chunk_size: Size in bytes for each chunk. Defaults to 8MB.
            Set to 0 to use file_size (single chunk, minimal progress updates).
            This value often aligns with cloud provider defaults (e.g., AWS S3
            multipart uploads default to 8MB). Google Cloud Storage defaults
            to a larger 100MB chunk size for uploads.
        expected_size: Expected total size in bytes for write operations.
            Required for accurate progress tracking in write mode.

    Attributes:
        bytes_read: Total bytes read from the file (thread-safe).
        bytes_written: Total bytes written to the file (thread-safe).
        chunk_size: Size of chunks for operations.

    Raises:
        ValueError: If path is empty or invalid parameters provided.
        FileNotFoundError: If the file does not exist in read mode.
        PermissionError: If the file cannot be accessed due to insufficient permissions.
        IsADirectoryError: If path points to a directory instead of a file.
        OSError: For other OS-level errors (disk full, I/O errors, etc.).
        IOError: For general I/O operation failures.

    Notes:
        The progress reported by the callback reflects the amount of data
        transferred to or from the underlying client library (e.g., `boto3`,
        `google-cloud-storage`), not the actual network transfer progress.
        Cloud provider libraries often have their own internal buffering,
        chunking, and retry mechanisms that are not visible to this class.
        Therefore, the progress updates indicate how much data the library has
        consumed from this file-like object, which is a close proxy but not
        a direct measure of the upload/download to the cloud service.

    Example:
        ```
        # Reading (e.g., uploading to cloud storage):

        def progress(current, total):
            print(f"Progress: {current}/{total} bytes ({current/total*100:.1f}%)")

        with StreamingFile('large_file.mp4', 'rb', callback=progress) as f:
            blob.upload_from_file(f)

        # Writing (e.g., downloading from cloud storage):
        # Download a 100MB file

        with StreamingFile('output.mp4', 'wb', callback=progress,
                            expected_size=100*1024*1024) as f:
            blob.download_to_file(f)
        ```
    """

    bytes_read: int
    bytes_written: int
    callback: Callable[[int, int], None]
    chunk_size: int
    _total_size: int
    _mode: str
    _file: io.BufferedReader | io.BufferedWriter
    _lock: threading.RLock

    def __init__(
        self,
        path: int | str | bytes | os.PathLike[str] | os.PathLike[bytes],
        mode: str = "r",
        callback: Callable[[int, int], None] | None = None,
        chunk_size: int = 8 * 1024 * 1024,
        expected_size: int | None = None,
    ) -> None:
        """
        Initialize a StreamingFile with progress tracking.

        Args:
            path: Path to the file to open.
            mode: File mode string (e.g., 'rb', 'wb').
            callback: Optional progress callback function.
            chunk_size: Size of chunks for read/write operations in bytes.
            expected_size: Expected total size for write mode (enables progress tracking).

        Raises:
            ValueError: If path is empty or invalid parameters provided.
            FileNotFoundError: If the file does not exist in read mode.
            PermissionError: If the file cannot be accessed due to insufficient permissions.
            IsADirectoryError: If path points to a directory.
            OSError: For other OS-level errors during file opening.
        """
        if not path:
            raise ValueError("StreamingFile path required")

        # Initialize thread safety lock
        self._lock = threading.RLock()

        # Open the underlying file with appropriate buffering
        self._mode = mode

        # Normalize binary mode
        if "b" not in mode:
            mode = mode.replace("r", "rb").replace("w", "wb").replace("a", "ab")

        # Open raw file and wrap with buffered I/O
        raw_file = io.FileIO(path, mode)

        if "r" in mode:
            self._file = io.BufferedReader(raw_file)
        elif "w" in mode or "a" in mode:
            self._file = io.BufferedWriter(raw_file)
        else:
            self._file = raw_file

        self.callback = callback or self._callback_default

        # Determine total size for progress calculations
        if "w" in mode or "a" in mode:
            # Write/append mode: use expected_size or 0
            self._total_size = expected_size or 0
        else:
            # Read mode: get actual file size
            self._total_size = os.fstat(self._file.fileno()).st_size

        # Set chunk size (use 0 to disable chunking and use file_size)
        if chunk_size == 0:
            self.chunk_size = max(self._total_size, 1)
        else:
            self.chunk_size = max(chunk_size, 1)

        # Initialize progress counters
        self.bytes_read = 0
        self.bytes_written = 0

    @property
    def name(self) -> str:
        """
        Get the name of the file.

        Returns:
            File name or path.
        """
        return self._file.name

    @property
    def mode(self) -> str:
        """
        Get the file mode.

        Returns:
            File mode string.
        """
        return self._mode

    @property
    def closed(self) -> bool:
        """
        Check if the file is closed.

        Returns:
            True if the file is closed, False otherwise.
        """
        return self._file.closed

    def fileno(self) -> int:
        """
        Get the file descriptor.

        Returns:
            Integer file descriptor.

        Raises:
            ValueError: If the file is closed.
            OSError: If the file descriptor is not available.
        """
        return self._file.fileno()

    @property
    def file_size(self) -> int:
        """
        Get the current size of the file in bytes.

        Returns:
            Current file size in bytes.

        Raises:
            ValueError: If file is closed.
            OSError: If unable to stat the file.
        """
        with self._lock:
            if self.closed:
                raise ValueError(f"Cannot get file size, file is closed: {self.name}")
            return os.fstat(self.fileno()).st_size

    @property
    def total_chunks(self) -> int:
        """
        Get the total number of chunks used for file progress.

        Returns:
            Total number of streaming file chunks.
        """
        return _get_chunks_number(self.chunk_size, self.total_size)

    @property
    def total_size(self) -> int:
        """
        Get the total size used for progress tracking.

        For read mode, this is the actual file size.
        For write mode, this is the expected_size provided at initialization.

        Returns:
            Total size in bytes for progress calculations.
        """
        return self._total_size

    @property
    def progress_percent(self) -> float:
        """
        Get the current progress as a percentage (thread-safe).

        Returns:
            Progress percentage (0.0 to 100.0).
        """
        with self._lock:
            if self._total_size == 0:
                return 0.0

            current = self.bytes_read if "r" in self._mode else self.bytes_written
            return (current / self._total_size) * 100.0

    def _callback_default(self, current_bytes: int, total_bytes: int) -> None:
        """
        Default progress callback that prints to stdout.

        Override by providing your own callback function to __init__.

        Args:
            current_bytes: Number of bytes transferred so far.
            total_bytes: Total bytes to transfer.
        """
        mode_str = "Read" if "r" in self._mode else "Write"
        percent = (current_bytes / total_bytes * 100) if total_bytes > 0 else 0
        print(f"{mode_str} Progress: {current_bytes}/{total_bytes} bytes ({percent:.1f}%)")

    def readable(self) -> bool:
        """
        Check if the file is readable.

        Returns:
            True if the file is opened for reading.
        """
        return "r" in self._mode

    def writable(self) -> bool:
        """
        Check if the file is writable.

        Returns:
            True if the file is opened for writing.
        """
        return "w" in self._mode or "a" in self._mode

    def seekable(self) -> bool:
        """
        Check if the file supports seek operations.

        Returns:
            True if the file is seekable.
        """
        return self._file.seekable()

    def read(self, size: int = -1) -> bytes:
        """
        Read up to size bytes from the file with progress tracking (thread-safe).

        This method breaks large reads into chunks to provide frequent
        progress updates via the callback function.

        Args:
            size: Maximum number of bytes to read. -1 means read until EOF.

        Returns:
            Bytes read from the file.

        Raises:
            ValueError: If the file is not open for reading or is closed.
            OSError: For I/O errors during read operations.
            IOError: For general I/O failures.
        """
        with self._lock:
            if not self.readable():
                raise ValueError("File not open for reading")

            # Optimize small reads: read directly without chunking
            if size > 0 and size <= self.chunk_size:
                data = self._file.read(size)
                self.bytes_read += len(data)

                # Only invoke callback if data was actually read and file is not empty
                if self.callback and len(data) > 0 and self._total_size > 0:
                    self.callback(self.bytes_read, self._total_size)

                return data

            # For large reads or read-all (-1), use chunked reading
            buffer = bytearray()
            bytes_remaining = size  # Tracks remaining bytes for this specific read() call

            while True:
                # Determine chunk size for this iteration
                if size == -1:
                    # Read all: use full chunk_size
                    chunk_to_read = self.chunk_size
                else:
                    # Bounded read: read up to remaining bytes
                    if bytes_remaining <= 0:
                        break
                    chunk_to_read = min(self.chunk_size, bytes_remaining)

                # Read the chunk
                chunk = self._file.read(chunk_to_read)

                # EOF or no data
                if not chunk:
                    break

                buffer.extend(chunk)
                self.bytes_read += len(chunk)

                if size != -1:
                    bytes_remaining -= len(chunk)

                # Report progress only if data was read and file is not empty
                if self.callback and self._total_size > 0:
                    self.callback(self.bytes_read, self._total_size)

            return bytes(buffer)

    def write(self, data: bytes) -> int:
        """
        Write bytes to the file with progress tracking (thread-safe).

        For large writes, data is written in chunks to provide frequent
        progress updates via the callback function.

        Args:
            data: Bytes to write to the file.

        Returns:
            Total number of bytes written.

        Raises:
            ValueError: If the file is not open for writing or is closed.
            OSError: For I/O errors during write (disk full, etc.).
            IOError: For general I/O failures.
        """
        with self._lock:
            if not self.writable():
                raise ValueError("File not open for writing")

            total_bytes_to_write = len(data)
            bytes_written_this_call = 0

            # Optimize small writes: write directly without chunking
            if total_bytes_to_write <= self.chunk_size:
                result = self._file.write(data)
                self.bytes_written += result

                # Only invoke callback if data was actually written and expected size is not empty
                if self.callback and result > 0 and self._total_size > 0:
                    self.callback(self.bytes_written, self._total_size)

                return result

            # For large writes, write in chunks
            for i in range(0, total_bytes_to_write, self.chunk_size):
                chunk = data[i : i + self.chunk_size]
                result = self._file.write(chunk)
                bytes_written_this_call += result
                self.bytes_written += result

                # Report progress only if data was written and expected size is not empty
                if self.callback and self._total_size > 0:
                    self.callback(self.bytes_written, self._total_size)

            return bytes_written_this_call

    def seek(self, offset: int, whence: int = 0) -> int:
        """
        Change the file position and update progress counters (thread-safe).

        When seeking, the progress counters are updated to reflect the new
        file position to maintain accurate progress tracking.

        Args:
            offset: Offset in bytes.
            whence: Reference point: 0=start, 1=current position, 2=end.

        Returns:
            New absolute file position in bytes.

        Raises:
            ValueError: If the file is closed or not seekable.
            OSError: For I/O errors during seek operation.
        """
        with self._lock:
            new_position = self._file.seek(offset, whence)

            # Update progress counter to match new position
            if "r" in self._mode:
                self.bytes_read = new_position
            else:
                # For write/append mode, also update position
                # This handles cases where seeking back and overwriting
                self.bytes_written = new_position

            return new_position

    def tell(self) -> int:
        """
        Get the current file position (thread-safe).

        Returns:
            Current file position in bytes.

        Raises:
            ValueError: If the file is closed.
            OSError: For I/O errors.
        """
        with self._lock:
            return self._file.tell()

    def flush(self) -> None:
        """
        Flush write buffers (thread-safe).

        Raises:
            OSError: If flush fails due to I/O errors.
        """
        with self._lock:
            self._file.flush()

    def close(self) -> None:
        """
        Close the file (thread-safe).

        Raises:
            OSError: If close fails due to I/O errors.
        """
        with self._lock:
            if not self.closed:
                self._file.close()

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.close()
        return False

closed property

Check if the file is closed.

Returns:

Type Description
bool

True if the file is closed, False otherwise.

file_size property

Get the current size of the file in bytes.

Returns:

Type Description
int

Current file size in bytes.

Raises:

Type Description
ValueError

If file is closed.

OSError

If unable to stat the file.

mode property

Get the file mode.

Returns:

Type Description
str

File mode string.

name property

Get the name of the file.

Returns:

Type Description
str

File name or path.

progress_percent property

Get the current progress as a percentage (thread-safe).

Returns:

Type Description
float

Progress percentage (0.0 to 100.0).

total_chunks property

Get the total number of chunks used for file progress.

Returns:

Type Description
int

Total number of streaming file chunks.

total_size property

Get the total size used for progress tracking.

For read mode, this is the actual file size. For write mode, this is the expected_size provided at initialization.

Returns:

Type Description
int

Total size in bytes for progress calculations.

__enter__()

Context manager entry.

Source code in c108/io.py
491
492
493
def __enter__(self):
    """Context manager entry."""
    return self

__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in c108/io.py
495
496
497
498
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit."""
    self.close()
    return False

__init__(path, mode='r', callback=None, chunk_size=8 * 1024 * 1024, expected_size=None)

Initialize a StreamingFile with progress tracking.

Parameters:

Name Type Description Default
path int | str | bytes | PathLike[str] | PathLike[bytes]

Path to the file to open.

required
mode str

File mode string (e.g., 'rb', 'wb').

'r'
callback Callable[[int, int], None] | None

Optional progress callback function.

None
chunk_size int

Size of chunks for read/write operations in bytes.

8 * 1024 * 1024
expected_size int | None

Expected total size for write mode (enables progress tracking).

None

Raises:

Type Description
ValueError

If path is empty or invalid parameters provided.

FileNotFoundError

If the file does not exist in read mode.

PermissionError

If the file cannot be accessed due to insufficient permissions.

IsADirectoryError

If path points to a directory.

OSError

For other OS-level errors during file opening.

Source code in c108/io.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def __init__(
    self,
    path: int | str | bytes | os.PathLike[str] | os.PathLike[bytes],
    mode: str = "r",
    callback: Callable[[int, int], None] | None = None,
    chunk_size: int = 8 * 1024 * 1024,
    expected_size: int | None = None,
) -> None:
    """
    Initialize a StreamingFile with progress tracking.

    Args:
        path: Path to the file to open.
        mode: File mode string (e.g., 'rb', 'wb').
        callback: Optional progress callback function.
        chunk_size: Size of chunks for read/write operations in bytes.
        expected_size: Expected total size for write mode (enables progress tracking).

    Raises:
        ValueError: If path is empty or invalid parameters provided.
        FileNotFoundError: If the file does not exist in read mode.
        PermissionError: If the file cannot be accessed due to insufficient permissions.
        IsADirectoryError: If path points to a directory.
        OSError: For other OS-level errors during file opening.
    """
    if not path:
        raise ValueError("StreamingFile path required")

    # Initialize thread safety lock
    self._lock = threading.RLock()

    # Open the underlying file with appropriate buffering
    self._mode = mode

    # Normalize binary mode
    if "b" not in mode:
        mode = mode.replace("r", "rb").replace("w", "wb").replace("a", "ab")

    # Open raw file and wrap with buffered I/O
    raw_file = io.FileIO(path, mode)

    if "r" in mode:
        self._file = io.BufferedReader(raw_file)
    elif "w" in mode or "a" in mode:
        self._file = io.BufferedWriter(raw_file)
    else:
        self._file = raw_file

    self.callback = callback or self._callback_default

    # Determine total size for progress calculations
    if "w" in mode or "a" in mode:
        # Write/append mode: use expected_size or 0
        self._total_size = expected_size or 0
    else:
        # Read mode: get actual file size
        self._total_size = os.fstat(self._file.fileno()).st_size

    # Set chunk size (use 0 to disable chunking and use file_size)
    if chunk_size == 0:
        self.chunk_size = max(self._total_size, 1)
    else:
        self.chunk_size = max(chunk_size, 1)

    # Initialize progress counters
    self.bytes_read = 0
    self.bytes_written = 0

close()

Close the file (thread-safe).

Raises:

Type Description
OSError

If close fails due to I/O errors.

Source code in c108/io.py
480
481
482
483
484
485
486
487
488
489
def close(self) -> None:
    """
    Close the file (thread-safe).

    Raises:
        OSError: If close fails due to I/O errors.
    """
    with self._lock:
        if not self.closed:
            self._file.close()

fileno()

Get the file descriptor.

Returns:

Type Description
int

Integer file descriptor.

Raises:

Type Description
ValueError

If the file is closed.

OSError

If the file descriptor is not available.

Source code in c108/io.py
200
201
202
203
204
205
206
207
208
209
210
211
def fileno(self) -> int:
    """
    Get the file descriptor.

    Returns:
        Integer file descriptor.

    Raises:
        ValueError: If the file is closed.
        OSError: If the file descriptor is not available.
    """
    return self._file.fileno()

flush()

Flush write buffers (thread-safe).

Raises:

Type Description
OSError

If flush fails due to I/O errors.

Source code in c108/io.py
470
471
472
473
474
475
476
477
478
def flush(self) -> None:
    """
    Flush write buffers (thread-safe).

    Raises:
        OSError: If flush fails due to I/O errors.
    """
    with self._lock:
        self._file.flush()

read(size=-1)

Read up to size bytes from the file with progress tracking (thread-safe).

This method breaks large reads into chunks to provide frequent progress updates via the callback function.

Parameters:

Name Type Description Default
size int

Maximum number of bytes to read. -1 means read until EOF.

-1

Returns:

Type Description
bytes

Bytes read from the file.

Raises:

Type Description
ValueError

If the file is not open for reading or is closed.

OSError

For I/O errors during read operations.

IOError

For general I/O failures.

Source code in c108/io.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def read(self, size: int = -1) -> bytes:
    """
    Read up to size bytes from the file with progress tracking (thread-safe).

    This method breaks large reads into chunks to provide frequent
    progress updates via the callback function.

    Args:
        size: Maximum number of bytes to read. -1 means read until EOF.

    Returns:
        Bytes read from the file.

    Raises:
        ValueError: If the file is not open for reading or is closed.
        OSError: For I/O errors during read operations.
        IOError: For general I/O failures.
    """
    with self._lock:
        if not self.readable():
            raise ValueError("File not open for reading")

        # Optimize small reads: read directly without chunking
        if size > 0 and size <= self.chunk_size:
            data = self._file.read(size)
            self.bytes_read += len(data)

            # Only invoke callback if data was actually read and file is not empty
            if self.callback and len(data) > 0 and self._total_size > 0:
                self.callback(self.bytes_read, self._total_size)

            return data

        # For large reads or read-all (-1), use chunked reading
        buffer = bytearray()
        bytes_remaining = size  # Tracks remaining bytes for this specific read() call

        while True:
            # Determine chunk size for this iteration
            if size == -1:
                # Read all: use full chunk_size
                chunk_to_read = self.chunk_size
            else:
                # Bounded read: read up to remaining bytes
                if bytes_remaining <= 0:
                    break
                chunk_to_read = min(self.chunk_size, bytes_remaining)

            # Read the chunk
            chunk = self._file.read(chunk_to_read)

            # EOF or no data
            if not chunk:
                break

            buffer.extend(chunk)
            self.bytes_read += len(chunk)

            if size != -1:
                bytes_remaining -= len(chunk)

            # Report progress only if data was read and file is not empty
            if self.callback and self._total_size > 0:
                self.callback(self.bytes_read, self._total_size)

        return bytes(buffer)

readable()

Check if the file is readable.

Returns:

Type Description
bool

True if the file is opened for reading.

Source code in c108/io.py
282
283
284
285
286
287
288
289
def readable(self) -> bool:
    """
    Check if the file is readable.

    Returns:
        True if the file is opened for reading.
    """
    return "r" in self._mode

seek(offset, whence=0)

Change the file position and update progress counters (thread-safe).

When seeking, the progress counters are updated to reflect the new file position to maintain accurate progress tracking.

Parameters:

Name Type Description Default
offset int

Offset in bytes.

required
whence int

Reference point: 0=start, 1=current position, 2=end.

0

Returns:

Type Description
int

New absolute file position in bytes.

Raises:

Type Description
ValueError

If the file is closed or not seekable.

OSError

For I/O errors during seek operation.

Source code in c108/io.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def seek(self, offset: int, whence: int = 0) -> int:
    """
    Change the file position and update progress counters (thread-safe).

    When seeking, the progress counters are updated to reflect the new
    file position to maintain accurate progress tracking.

    Args:
        offset: Offset in bytes.
        whence: Reference point: 0=start, 1=current position, 2=end.

    Returns:
        New absolute file position in bytes.

    Raises:
        ValueError: If the file is closed or not seekable.
        OSError: For I/O errors during seek operation.
    """
    with self._lock:
        new_position = self._file.seek(offset, whence)

        # Update progress counter to match new position
        if "r" in self._mode:
            self.bytes_read = new_position
        else:
            # For write/append mode, also update position
            # This handles cases where seeking back and overwriting
            self.bytes_written = new_position

        return new_position

seekable()

Check if the file supports seek operations.

Returns:

Type Description
bool

True if the file is seekable.

Source code in c108/io.py
300
301
302
303
304
305
306
307
def seekable(self) -> bool:
    """
    Check if the file supports seek operations.

    Returns:
        True if the file is seekable.
    """
    return self._file.seekable()

tell()

Get the current file position (thread-safe).

Returns:

Type Description
int

Current file position in bytes.

Raises:

Type Description
ValueError

If the file is closed.

OSError

For I/O errors.

Source code in c108/io.py
456
457
458
459
460
461
462
463
464
465
466
467
468
def tell(self) -> int:
    """
    Get the current file position (thread-safe).

    Returns:
        Current file position in bytes.

    Raises:
        ValueError: If the file is closed.
        OSError: For I/O errors.
    """
    with self._lock:
        return self._file.tell()

writable()

Check if the file is writable.

Returns:

Type Description
bool

True if the file is opened for writing.

Source code in c108/io.py
291
292
293
294
295
296
297
298
def writable(self) -> bool:
    """
    Check if the file is writable.

    Returns:
        True if the file is opened for writing.
    """
    return "w" in self._mode or "a" in self._mode

write(data)

Write bytes to the file with progress tracking (thread-safe).

For large writes, data is written in chunks to provide frequent progress updates via the callback function.

Parameters:

Name Type Description Default
data bytes

Bytes to write to the file.

required

Returns:

Type Description
int

Total number of bytes written.

Raises:

Type Description
ValueError

If the file is not open for writing or is closed.

OSError

For I/O errors during write (disk full, etc.).

IOError

For general I/O failures.

Source code in c108/io.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def write(self, data: bytes) -> int:
    """
    Write bytes to the file with progress tracking (thread-safe).

    For large writes, data is written in chunks to provide frequent
    progress updates via the callback function.

    Args:
        data: Bytes to write to the file.

    Returns:
        Total number of bytes written.

    Raises:
        ValueError: If the file is not open for writing or is closed.
        OSError: For I/O errors during write (disk full, etc.).
        IOError: For general I/O failures.
    """
    with self._lock:
        if not self.writable():
            raise ValueError("File not open for writing")

        total_bytes_to_write = len(data)
        bytes_written_this_call = 0

        # Optimize small writes: write directly without chunking
        if total_bytes_to_write <= self.chunk_size:
            result = self._file.write(data)
            self.bytes_written += result

            # Only invoke callback if data was actually written and expected size is not empty
            if self.callback and result > 0 and self._total_size > 0:
                self.callback(self.bytes_written, self._total_size)

            return result

        # For large writes, write in chunks
        for i in range(0, total_bytes_to_write, self.chunk_size):
            chunk = data[i : i + self.chunk_size]
            result = self._file.write(chunk)
            bytes_written_this_call += result
            self.bytes_written += result

            # Report progress only if data was written and expected size is not empty
            if self.callback and self._total_size > 0:
                self.callback(self.bytes_written, self._total_size)

        return bytes_written_this_call