1 package guru.mikelue.jdut.decorate;
2
3 import java.sql.Connection;
4 import java.sql.DatabaseMetaData;
5 import java.sql.JDBCType;
6 import java.sql.ResultSet;
7 import java.sql.SQLException;
8 import java.util.ArrayList;
9 import java.util.Collections;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Optional;
14 import java.util.concurrent.ConcurrentHashMap;
15
16 import javax.sql.DataSource;
17
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import guru.mikelue.jdut.datagrain.DataRow;
22 import guru.mikelue.jdut.datagrain.DataRowException;
23 import guru.mikelue.jdut.datagrain.SchemaColumn;
24 import guru.mikelue.jdut.datagrain.SchemaTable;
25 import guru.mikelue.jdut.jdbc.JdbcSupplier;
26 import guru.mikelue.jdut.jdbc.JdbcTemplateFactory;
27 import guru.mikelue.jdut.jdbc.JdbcVoidFunction;
28 import guru.mikelue.jdut.jdbc.SQLExceptionConvert;
29 import guru.mikelue.jdut.jdbc.util.MetaDataWorker;
30
31
32
33
34
35
36
37 public class TableSchemaLoadingDecorator implements DataGrainDecorator {
38 private final static Logger logger = LoggerFactory.getLogger(TableSchemaLoadingDecorator.class);
39
40 private final DataSource dataSource;
41 private Map<String, SchemaTable> cachedTables = new ConcurrentHashMap<>(32);
42
43 public TableSchemaLoadingDecorator(DataSource newDataSource)
44 {
45 dataSource = newDataSource;
46 }
47
48
49
50
51 @Override
52 public void decorate(DataRow.Builder rowBuilder)
53 {
54 if (rowBuilder.getValidated()) {
55 return;
56 }
57
58
59
60
61 SchemaTable table = rowBuilder.getTable();
62 String tableIdentifier = table.getFullTableName();
63 logger.debug("Decorate table for loading schema: [{}]", tableIdentifier);
64
65 if (!cachedTables.containsKey(tableIdentifier)) {
66 logger.debug("First time of loading schema");
67 SchemaTable newTableSchema = loadSchema(table);
68
69 cachedTables.put(tableIdentifier, newTableSchema);
70 }
71
72
73 rowBuilder.tableSchema(cachedTables.get(tableIdentifier));
74
75
76
77
78 try {
79 rowBuilder.validate();
80 } catch (DataRowException e) {
81 logger.error("Validation of row[{}] has error", rowBuilder.getTable());
82 throw new RuntimeException(e);
83 }
84
85 }
86
87 private SchemaTable../../guru/mikelue/jdut/datagrain/SchemaTable.html#SchemaTable">SchemaTable loadSchema(SchemaTable source)
88 {
89 JdbcSupplier<SchemaTable> funcForLoadingSchema = JdbcTemplateFactory.buildSupplier(
90 () -> dataSource.getConnection(),
91 conn -> loadSchema(conn, source)
92 );
93
94 try {
95 return funcForLoadingSchema.getJdbc();
96 } catch (SQLException e) {
97 logger.error("SQL error while loading schema of table: {}", source.toString());
98 throw SQLExceptionConvert.runtimeException(e);
99 }
100 }
101
102 private SchemaTablee/jdut/datagrain/SchemaTable.html#SchemaTable">SchemaTable loadSchema(Connection conn, SchemaTable sourceTable)
103 throws SQLException
104 {
105 DatabaseMetaData metaData = conn.getMetaData();
106 MetaDataWorker metaDataWorker = new MetaDataWorker(metaData);
107 SchemaAndTableName cananicalName = processSchemaAndTableName(metaDataWorker, sourceTable);
108
109 logger.debug("Load schema for: {}", cananicalName);
110
111
112
113
114
115 return SchemaTable.build(tableBuilder -> {
116 tableBuilder.metaDataWorker(metaDataWorker);
117 tableBuilder.schema(cananicalName.schema);
118 tableBuilder.name(cananicalName.table);
119 tableBuilder.keys(sourceTable.getKeys().toArray(new String[0]));
120
121 Map<String, SchemaColumn> loadedColumns = loadColumns(cananicalName, metaData);
122 for (SchemaColumn column: loadedColumns.values()) {
123 tableBuilder.column(column);
124 }
125
126
127
128
129 if (sourceTable.getKeys().isEmpty()) {
130 logger.debug("Fetch keys for table: \"{}\"", sourceTable.getName());
131
132 String[] keys = loadKeys(loadedColumns, metaData, cananicalName);
133 tableBuilder.keys(keys);
134 }
135
136 });
137
138 }
139
140 private SchemaAndTableName processSchemaAndTableName(MetaDataWorker metaDataWorker, SchemaTable sourceTable)
141 throws SQLException
142 {
143 String catalog = sourceTable.getCatalog()
144 .map(metaDataWorker::processIdentifier)
145 .orElse(null);
146 String sourceTableName = metaDataWorker.processIdentifier(
147 sourceTable.getName()
148 );
149
150 if (
151 !metaDataWorker.supportsSchemasInTableDefinitions() ||
152 !sourceTableName.contains(".") ||
153 sourceTable.getSchema().isPresent()
154 ) {
155 return new SchemaAndTableName(
156 catalog,
157 metaDataWorker.processIdentifier(sourceTable.getSchema().orElse(null)),
158 sourceTableName
159 );
160 }
161
162 String[] schemaAndTableName = sourceTableName.split("\\.");
163 if (schemaAndTableName.length > 2) {
164 throw new RuntimeException(
165 String.format("Cannot recgonize schema and table name: \"%s\"", sourceTableName)
166 );
167 }
168
169 return new SchemaAndTableName(
170 catalog,
171 metaDataWorker.processIdentifier(schemaAndTableName[0]),
172 metaDataWorker.processIdentifier(schemaAndTableName[1])
173 );
174 }
175
176 private Map<String, SchemaColumn> loadColumns(
177 SchemaAndTableName cananicalName,
178 DatabaseMetaData metaData
179 ) {
180 logger.debug("Load columns for: {}", cananicalName);
181
182 JdbcSupplier<Map<String, SchemaColumn>> jdbcGetColumns = JdbcTemplateFactory.buildSupplier(
183 () -> metaData.getColumns(
184 cananicalName.catalog,
185 cananicalName.schema, cananicalName.table,
186 null
187 ),
188
189
190
191 (ResultSet rsColumns) -> {
192 Map<String, SchemaColumn> columns = new HashMap<>();
193
194 while (rsColumns.next()) {
195 String columnName = rsColumns.getString("COLUMN_NAME");
196 JDBCType jdbcType = JDBCType.valueOf(rsColumns.getInt("DATA_TYPE"));
197
198 logger.debug("Loading meta-data of columns: \"{}\". Type: [{}]",
199 columnName, jdbcType
200 );
201
202 JdbcVoidFunction<SchemaColumn.Builder> columnBuilder = builder -> {
203 builder
204 .name(columnName)
205 .jdbcType(jdbcType)
206 .defaultValue(rsColumns.getString("COLUMN_DEF"));
207
208 try {
209 String autoIncremental = rsColumns.getString("IS_AUTOINCREMENT");
210 if (autoIncremental != null) {
211 builder.autoIncremental("YES".equals(autoIncremental) ? true : false);
212 }
213 } catch (SQLException e) {
214 logger.info("This database doesn't have \"IS_AUTOINCREMENT\" meta data of JDBC");
215 }
216
217 switch (rsColumns.getInt("NULLABLE")) {
218 case DatabaseMetaData.columnNullable:
219 builder.nullable(true);
220 break;
221 case DatabaseMetaData.columnNoNulls:
222 builder.nullable(false);
223 break;
224 }
225 };
226
227 SchemaColumn loadedColumn =
228 SchemaColumn.build(columnBuilder.asConsumer());
229 columns.put(loadedColumn.getName(), loadedColumn);
230 }
231
232
233 return columns;
234 }
235 );
236
237 return jdbcGetColumns.asSupplier().get();
238 }
239
240 private String[] loadKeys(
241 Map<String, SchemaColumn> columnsInfo,
242 DatabaseMetaData metaData,
243 SchemaAndTableName cananicalName
244 ) {
245 logger.debug("Load keys for: {}", cananicalName);
246
247 List<String> keys = null;
248
249
250
251
252 JdbcSupplier<List<String>> loadKeysByPk = JdbcTemplateFactory.buildSupplier(
253 () -> metaData.getPrimaryKeys(
254 cananicalName.catalog,
255 cananicalName.schema,
256 cananicalName.table
257 ),
258 rs -> {
259 List<String> loadedKeys = new ArrayList<>(4);
260
261 while (rs.next()) {
262 String pkName = rs.getString("COLUMN_NAME");
263
264 logger.debug("Fetch key by primary key: \"{}\"", pkName);
265 loadedKeys.add(pkName);
266 }
267
268 return loadedKeys;
269 }
270 );
271 keys = loadKeysByPk.asSupplier().get();
272 if (!keys.isEmpty()) {
273 return keys.toArray(new String[0]);
274 }
275
276
277 logger.debug("Deduce keys for: {}", cananicalName);
278 String[] deducedKeys = deduceKeys(
279 columnsInfo,
280 () -> metaData.getIndexInfo(
281 cananicalName.catalog,
282 cananicalName.schema,
283 cananicalName.table,
284 true, true
285 )
286 ).toArray(new String[0]);
287
288 return deducedKeys;
289 }
290
291 private List<String> deduceKeys(
292 Map<String, SchemaColumn> columnsInfo,
293 JdbcSupplier<ResultSet> rsUniqueIndexSupplier
294 ) {
295 JdbcSupplier<List<String>> supplier = JdbcTemplateFactory.buildSupplier(
296 rsUniqueIndexSupplier,
297 rs -> {
298 Map<String, List<String>> indexes = new HashMap<>(4);
299 Map<String, Integer> numberOfNullableColumns = new HashMap<>(4);
300
301 while (rs.next()) {
302 String columnName = rs.getString("COLUMN_NAME");
303 if (columnName == null) {
304 continue;
305 }
306
307 String indexName = rs.getString("INDEX_NAME");
308 SchemaColumn schemaColumn = columnsInfo.get(columnName);
309
310 logger.debug(
311 "Collected information of unique index: \"{}\" and column: \"{}\". Nullable: [{}]",
312 indexName, columnName, schemaColumn.getNullable().get()
313 );
314
315
316
317
318 if (!indexes.containsKey(indexName)) {
319 indexes.put(indexName, new ArrayList<>(4));
320 numberOfNullableColumns.put(indexName, 0);
321 }
322
323 indexes.get(indexName).add(columnName);
324 numberOfNullableColumns.put(
325 indexName,
326 numberOfNullableColumns.get(indexName) +
327 ( schemaColumn.getNullable().get() ? 1 : 0 )
328 );
329
330 }
331
332
333
334
335 Optional<Map.Entry<String, Integer>> notNullIndexWithLeastColumns = numberOfNullableColumns.entrySet().stream()
336 .filter(
337
338 indexEntry -> indexEntry.getValue() == 0
339 )
340 .min((entryLeft, entryRight) ->
341 Integer.compare(
342 indexes.get(entryLeft.getKey()).size(),
343 indexes.get(entryRight.getKey()).size()
344 )
345 );
346
347 if (notNullIndexWithLeastColumns.isPresent()) {
348 List<String> nonNullKeys = indexes.get(
349 notNullIndexWithLeastColumns.get().getKey()
350 );
351 logger.debug("Got non null keys: {}", nonNullKeys);
352 return nonNullKeys;
353 }
354
355
356
357
358
359 Optional<Map.Entry<String, Integer>> nullableIndexWithLeastColumns = numberOfNullableColumns.entrySet().stream()
360 .min((entryLeft, entryRight) ->
361 entryLeft.getValue().compareTo(
362 entryRight.getValue()
363 )
364 );
365
366 if (nullableIndexWithLeastColumns.isPresent()) {
367 List<String> nullableKeys = indexes.get(
368 nullableIndexWithLeastColumns.get().getKey()
369 );
370 logger.debug("Got nullable keys: {}", nullableKeys);
371 return nullableKeys;
372 }
373
374
375 return Collections.<String>emptyList();
376 }
377 );
378
379 return supplier.asSupplier().get();
380 }
381 }
382
383 class SchemaAndTableName {
384 String catalog;
385 String schema;
386 String table;
387
388 SchemaAndTableName(String newCatalog, String newSchema, String newTable)
389 {
390 catalog = newCatalog;
391 schema = newSchema;
392 table = newTable;
393 }
394
395 @Override
396 public String toString()
397 {
398 return String.format(
399 "Cananical name(<catalog>.<schema>.<table>): [%s.%s.%s]",
400 catalog == null ? "<null>" : catalog,
401 schema == null ? "<null>" : schema,
402 table
403 );
404 }
405 }