diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 3763e21abc9f..9133e8429890 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -34,7 +34,11 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem -from apache_beam.io.gcp import gcsio + +try: + from apache_beam.io.gcp import gcsio +except ImportError: + gcsio = None # type: ignore[assignment] __all__ = ['GCSFileSystem'] @@ -43,13 +47,17 @@ class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ - CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' def __init__(self, pipeline_options): super().__init__(pipeline_options) self._pipeline_options = pipeline_options + @property + def CHUNK_SIZE(self): + """Chunk size in batch operations.""" + return self._gcsIO().MAX_BATCH_OPERATION_SIZE + @classmethod def scheme(cls): """URI scheme for the FileSystem @@ -371,7 +379,7 @@ def delete(self, paths): def report_lineage(self, path, lineage): try: components = gcsio.parse_gcs_path(path, object_optional=True) - except ValueError: + except (ValueError, AttributeError): # report lineage is fail-safe traceback.print_exc() return diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 08fdd6302887..578745e42bbe 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -48,6 +48,14 @@ def test_scheme(self): self.assertEqual(self.fs.scheme(), 'gs') self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs') + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio', None) + def test_get_filesystem_does_not_require_gcp_extra(self): + # Verifies that GCSFileSystem can be looked up without GCP deps installed. + # GCP dependency errors should only be raised at usage time, not lookup. + from apache_beam.io.filesystems import FileSystems + fs = FileSystems.get_filesystem('gs://test-bucket/path') + self.assertEqual(fs.scheme(), 'gs') + def test_join(self): self.assertEqual( 'gs://bucket/path/to/file',