Skip to content

Commit 974e83e

Browse files
rouellet99fhussonnois
authored andcommitted
Delete the compressed file after extraction
Added a configuration to delete the compressed file after extraction
1 parent 8e0e19c commit 974e83e

File tree

5 files changed

+118
-60
lines changed

5 files changed

+118
-60
lines changed

connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public LocalFSDirectoryListing() {
4949
this(Collections.emptyList());
5050
}
5151

52-
5352
/**
5453
* Creates a new {@link LocalFSDirectoryListing} instance.
5554
*
@@ -86,8 +85,7 @@ private Collection<FileObjectMeta> toSourceObjects(final Collection<File> allFil
8685
} catch (ConnectFilePulseException e) {
8786
LOG.warn(
8887
"Failed to read metadata. Object file is ignored: {}",
89-
e.getMessage()
90-
);
88+
e.getMessage());
9189
return Optional.<LocalFileObjectMeta>empty();
9290
}
9391
})
@@ -139,8 +137,16 @@ private List<File> listEligibleFiles(final Path input) {
139137
final Path decompressed = codec.decompress(file).toPath();
140138
listingLocalFiles.addAll(listEligibleFiles(decompressed));
141139
decompressedDirs.add(decompressed);
142-
} catch (IOException e) {
143-
LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e);
140+
if (config.isDeleteCompressFileEnable() && decompressed.toFile().exists()) {
141+
file.delete();
142+
}
143+
LOG.debug("Compressed file deleted successfully : {}", path);
144+
} catch (IOException | SecurityException e) {
145+
if (e instanceof IOException) {
146+
LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e);
147+
} else if (e instanceof SecurityException) {
148+
LOG.warn("Error while deleting input file '{}'. Skip and continue.", path, e);
149+
}
144150
}
145151
} else {
146152
// If no codec was found for the input file,
@@ -155,17 +161,15 @@ private List<File> listEligibleFiles(final Path input) {
155161
LOG.error(
156162
"Error while getting directory listing for {}: {}",
157163
input,
158-
e.getLocalizedMessage()
159-
);
164+
e.getLocalizedMessage());
160165
throw new ConnectException(e);
161166
}
162167

163168
if (config.isRecursiveScanEnable() && !directories.isEmpty()) {
164169
listingLocalFiles.addAll(directories.stream()
165170
.filter(f -> !decompressedDirs.contains(f))
166171
.flatMap(f -> listEligibleFiles(f).stream())
167-
.collect(Collectors.toList())
168-
);
172+
.collect(Collectors.toList()));
169173
}
170174
return listingLocalFiles;
171175
}
@@ -175,10 +179,9 @@ private boolean isHidden(final Path input) {
175179
return Files.isHidden(input);
176180
} catch (IOException e) {
177181
LOG.warn(
178-
"Error while checking if input file is hidden '{}': {}",
179-
input,
180-
e.getLocalizedMessage()
181-
);
182+
"Error while checking if input file is hidden '{}': {}",
183+
input,
184+
e.getLocalizedMessage());
182185
return false;
183186
}
184187
}

connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingConfig.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,37 @@
1212

1313
public class LocalFSDirectoryListingConfig extends AbstractConfig {
1414

15-
1615
public static final String FS_LISTING_DIRECTORY_PATH = "fs.listing.directory.path";
1716
public static final String FS_LISTING_DIRECTORY_DOC = "The input directory to scan";
1817

19-
public static final String FS_RECURSIVE_SCAN_ENABLE_CONFIG = "fs.listing.recursive.enabled";
20-
private static final String FS_RECURSIVE_SCAN_ENABLE_DOC = "Boolean indicating whether local directory " +
21-
"should be recursively scanned (default true).";
18+
public static final String FS_RECURSIVE_SCAN_ENABLE_CONFIG = "fs.listing.recursive.enabled";
19+
private static final String FS_RECURSIVE_SCAN_ENABLE_DOC = "Boolean indicating whether local directory " +
20+
"should be recursively scanned (default true).";
21+
22+
public static final String FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG = "fs.delete.compress.files.enabled";
23+
private static final String FS_DELETE_COMPRESS_FILES_ENABLE_DOC = "Flag indicating whether compressed file " +
24+
"should be deleted after extraction (default false)";
2225

2326
public static ConfigDef getConf() {
2427
return new ConfigDef()
25-
.define(
26-
FS_LISTING_DIRECTORY_PATH,
27-
ConfigDef.Type.STRING,
28-
ConfigDef.Importance.HIGH,
29-
FS_LISTING_DIRECTORY_DOC
30-
)
31-
32-
.define(
33-
FS_RECURSIVE_SCAN_ENABLE_CONFIG,
34-
ConfigDef.Type.BOOLEAN,
35-
true,
36-
ConfigDef.Importance.MEDIUM,
37-
FS_RECURSIVE_SCAN_ENABLE_DOC
38-
);
28+
.define(
29+
FS_LISTING_DIRECTORY_PATH,
30+
ConfigDef.Type.STRING,
31+
ConfigDef.Importance.HIGH,
32+
FS_LISTING_DIRECTORY_DOC)
33+
34+
.define(
35+
FS_RECURSIVE_SCAN_ENABLE_CONFIG,
36+
ConfigDef.Type.BOOLEAN,
37+
true,
38+
ConfigDef.Importance.MEDIUM,
39+
FS_RECURSIVE_SCAN_ENABLE_DOC)
40+
.define(
41+
FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG,
42+
ConfigDef.Type.BOOLEAN,
43+
false,
44+
ConfigDef.Importance.MEDIUM,
45+
FS_DELETE_COMPRESS_FILES_ENABLE_DOC);
3946
}
4047

4148
/**
@@ -53,4 +60,8 @@ public boolean isRecursiveScanEnable() {
5360
public String listingDirectoryPath() {
5461
return this.getString(FS_LISTING_DIRECTORY_PATH);
5562
}
63+
64+
public boolean isDeleteCompressFileEnable() {
65+
return getBoolean(FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG);
66+
}
5667
}

connect-file-pulse-filesystems/filepulse-local-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingTest.java

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
public class LocalFSDirectoryListingTest {
2828

2929
private static final String DEFAULT_ENTRY_FILE_NAME = "file-entry-0.txt";
30-
private static final String DEFAULT_ARCHIVE_NAME = "archive";
30+
private static final String DEFAULT_ARCHIVE_NAME = "archive";
3131
private static final String TEST_SCAN_DIRECTORY = "test-scan";
3232

3333
@Rule
@@ -54,47 +54,79 @@ public void shouldExtractXZGipCompressedFilesPathWhileScanningGivenRecursiveScan
5454
zos.closeEntry();
5555
}
5656

57-
scanner.configure(new HashMap<>() {{
58-
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false);
59-
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
60-
}});
57+
scanner.configure(new HashMap<>() {
58+
{
59+
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false);
60+
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
61+
}
62+
});
6163

6264
final Collection<FileObjectMeta> scanned = scanner.listObjects();
6365
Assert.assertEquals(1, scanned.size());
64-
String expected = String.join(File.separator, Arrays.asList(inputDirectory.getCanonicalPath(), DEFAULT_ARCHIVE_NAME, DEFAULT_ENTRY_FILE_NAME));
66+
String expected = String.join(File.separator,
67+
Arrays.asList(inputDirectory.getCanonicalPath(), DEFAULT_ARCHIVE_NAME, DEFAULT_ENTRY_FILE_NAME));
6568
Assert.assertEquals(expected, getCanonicalPath(scanned.iterator().next()));
6669
}
6770

6871
@Test
69-
public void shouldExtractGzipCompressedFiles() throws IOException {
72+
public void shouldExtractGzipCompressedFilesAndKeepGzipFileAfterExtraction() throws IOException {
7073
File archiveFile = new File(inputDirectory, DEFAULT_ARCHIVE_NAME + ".gz");
7174

7275
try (GZIPOutputStream os = new GZIPOutputStream(new FileOutputStream(archiveFile))) {
7376
byte[] data = "dummy".getBytes();
7477
os.write(data, 0, data.length);
7578
}
7679

77-
scanner.configure(new HashMap<>() {{
78-
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
79-
}});
80+
scanner.configure(new HashMap<>() {
81+
{
82+
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
83+
}
84+
});
8085

8186
final Collection<FileObjectMeta> scanned = scanner.listObjects();
8287
Assert.assertEquals(1, scanned.size());
8388
String expected = String.join(File.separator, Arrays.asList(inputDirectory.getCanonicalPath(),
8489
DEFAULT_ARCHIVE_NAME, DEFAULT_ARCHIVE_NAME));
8590
Assert.assertEquals(expected, getCanonicalPath(scanned.iterator().next()));
91+
Assert.assertTrue(archiveFile.exists());
8692
}
87-
93+
94+
@Test
95+
public void shouldExtractGzipCompressedFilesAndDeleteGzipFileAfterExtraction() throws IOException {
96+
File archiveFile = new File(inputDirectory, DEFAULT_ARCHIVE_NAME + ".gz");
97+
98+
try (GZIPOutputStream os = new GZIPOutputStream(new FileOutputStream(archiveFile))) {
99+
byte[] data = "dummy".getBytes();
100+
os.write(data, 0, data.length);
101+
}
102+
103+
scanner.configure(new HashMap<>() {
104+
{
105+
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
106+
put(LocalFSDirectoryListingConfig.FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG, true);
107+
}
108+
});
109+
110+
final Collection<FileObjectMeta> scanned = scanner.listObjects();
111+
Assert.assertEquals(1, scanned.size());
112+
String expected = String.join(File.separator, Arrays.asList(inputDirectory.getCanonicalPath(),
113+
DEFAULT_ARCHIVE_NAME, DEFAULT_ARCHIVE_NAME));
114+
Assert.assertEquals(expected, getCanonicalPath(scanned.iterator().next()));
115+
Assert.assertTrue(!archiveFile.exists());
116+
}
117+
88118
@Test
89119
public void shouldListFilesGivenRecursiveScanEnable() throws IOException {
90-
folder.newFolder(TEST_SCAN_DIRECTORY , "sub-directory");
120+
folder.newFolder(TEST_SCAN_DIRECTORY, "sub-directory");
91121
final File file1 = folder.newFile(TEST_SCAN_DIRECTORY + "/test-file1.txt");
92122
final File file2 = folder.newFile(TEST_SCAN_DIRECTORY + "/sub-directory/test-file2.txt");
93123

94-
scanner.configure(new HashMap<String, Object>(){{
95-
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, true);
96-
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
97-
}});
124+
scanner.configure(new HashMap<String, Object>() {
125+
{
126+
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, true);
127+
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
128+
}
129+
});
98130

99131
final Collection<String> scanned = scanner
100132
.listObjects()
@@ -117,14 +149,16 @@ private String getCanonicalPath(final FileObjectMeta s) {
117149

118150
@Test
119151
public void shouldListFilesGivenRecursiveScanDisable() throws IOException {
120-
folder.newFolder(TEST_SCAN_DIRECTORY , "sub-directory");
152+
folder.newFolder(TEST_SCAN_DIRECTORY, "sub-directory");
121153
final File file1 = folder.newFile(TEST_SCAN_DIRECTORY + "/test-file1.txt");
122154
folder.newFile(TEST_SCAN_DIRECTORY + "/sub-directory/test-file2.txt"); // will not be scanned
123155

124-
scanner.configure(new HashMap<String, Object>(){{
125-
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false);
126-
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
127-
}});
156+
scanner.configure(new HashMap<String, Object>() {
157+
{
158+
put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false);
159+
put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath());
160+
}
161+
});
128162

129163
final Collection<String> scanned = scanner
130164
.listObjects()

connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/offset/DefaultOffsetPolicyTest.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,28 @@
1313
import java.util.Collections;
1414
import java.util.Map;
1515
import java.util.stream.Collectors;
16+
import org.apache.commons.lang3.SystemUtils;
1617
import org.junit.Assert;
1718
import org.junit.Test;
1819

1920
public class DefaultOffsetPolicyTest {
2021

22+
private String getValidPath() {
23+
String validPath = SystemUtils.OS_NAME;
24+
if (validPath.contains("Windows")) {
25+
return "C:\\tmp\\path";
26+
} else {
27+
return "/tmp/path";
28+
}
29+
}
30+
2131
private static final GenericFileObjectMeta metadata = new GenericFileObjectMeta(
2232
URI.create("file:///tmp/path/test"),
2333
"test",
2434
0L,
2535
123L,
2636
new FileObjectMeta.ContentDigest("789", "dummy"),
27-
Collections.singletonMap(LocalFileObjectMeta.SYSTEM_FILE_INODE_META_KEY, "456L")
28-
);
37+
Collections.singletonMap(LocalFileObjectMeta.SYSTEM_FILE_INODE_META_KEY, "456L"));
2938

3039
@Test(expected = IllegalArgumentException.class)
3140
public void should_throw_illegal_argument_given_empty_strategy() {
@@ -46,7 +55,7 @@ public void should_throw_npe_given_unknown_strategy() {
4655
public void should_get_offset_based_on_path() {
4756
Map<String, Object> result = new DefaultSourceOffsetPolicy("PATH").toPartitionMap(metadata);
4857
Assert.assertEquals(1, result.size());
49-
Assert.assertEquals("/tmp/path", result.get("path"));
58+
Assert.assertEquals(getValidPath(), result.get("path"));
5059
}
5160

5261
@Test
@@ -76,7 +85,7 @@ public void should_get_offset_based_on_name() {
7685
public void should_get_composed_offset_based_on_path_and_hash() {
7786
Map<String, Object> result = new DefaultSourceOffsetPolicy("PATH+HASH").toPartitionMap(metadata);
7887
Assert.assertEquals(2, result.size());
79-
Assert.assertEquals("/tmp/path", result.get("path"));
88+
Assert.assertEquals(getValidPath(), result.get("path"));
8089
Assert.assertEquals("789", result.get("hash"));
8190
}
8291

docs/content/en/docs/Developer Guide/file-system-listing/local-filesystem.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ Use the following property in your Connector's configuration:
1919

2020
The following table describes the properties that can be used to configure the `LocalFSDirectoryListing`:
2121

22-
| Configuration | Description | Type | Default | Importance |
23-
|--------------------------------|-----------------------------------------------------------------------|-----------|---------|------------|
24-
| `fs.listing.directory.path` | The input directory to scan | `string` | - | HIGH |
25-
| `fs.listing.recursive.enabled` | Flag indicating whether local directory should be recursively scanned | `boolean` | `true` | MEDIUM |
22+
| Configuration | Description | Type | Default | Importance |
23+
| ---------------------------------- | -------------------------------------------------------------------------- | --------- | ------- | ---------- |
24+
| `fs.listing.directory.path` | The input directory to scan | `string` | - | HIGH |
25+
| `fs.listing.recursive.enabled` | Flag indicating whether local directory should be recursively scanned | `boolean` | `true` | MEDIUM |
26+
| `fs.delete.compress.files.enabled` | Flag indicating whether compressed file should be deleted after extraction | `boolean` | `false` | MEDIUM |
2627

2728
## Supported File types
2829

0 commit comments

Comments
 (0)