001 /* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016 package org.opengion.fukurou.process; 017 018 import org.opengion.fukurou.util.Argument; 019 import org.opengion.fukurou.util.SystemParameter; 020 import org.opengion.fukurou.util.LogWriter; 021 022 import org.opengion.fukurou.util.HybsEntry ; 023 import org.opengion.fukurou.util.Closer; 024 import org.opengion.fukurou.db.ConnectionFactory; 025 026 import java.util.Set ; 027 import java.util.HashSet ; 028 import java.util.Map ; 029 import java.util.LinkedHashMap ; 030 031 import java.sql.Connection; 032 import java.sql.Statement; 033 import java.sql.ResultSet; 034 import java.sql.SQLException; 035 036 /** 037 * Process_BulkQueryは、データベ?スから読み取った?容を??処?るために? 038 * ParamProcess のサブクラス(Process_DBParam)にセ?したり??したりす? 039 * FirstProcess と、ChainProcess のインターフェースを両方持った?実?ラスです? 040 * 041 * こ?クラスは、上流から?下流への処???度しか実行されません? 042 * FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します? 043 * ChainProcess は、その結果を取り?し?自??身の処?果と合せて?します? 044 * 045 * FirstProcess では?action は、query のみです? 046 * query は、指定?SQL?実行し、結果のSetをParamProcessに設定します? 047 * ChainProcess では?action は、query、bulkSet、minus、intersect が指定できます? 048 * query は、上記と同じです? 049 * minus は、?のSetから、SQL??実行結果を引き算し、結果Setを?設定します? 050 * intersect は、?のSetから、SQL??実行結果と重?る結果Setを?設定します? 051 * bulkSet は、?のSetを取り?し?SQL??して処?ます? 052 * 流れ?は、query で検索し?minusまた?intersect でSetオブジェクトを?し?bulkSet で 053 * 利用します?例えば、ORACLEから、ユニ?クキーのSetを作?し?SQLServerのユニ?クキー? 054 * minusした結果を?ORACLEからDELETEすれば、不要な??タを削除するなどの処?実行可能になります? 055 * また?単純に、query ?を?チェインすれば、単発のUPDATE?実行することが可能です? 056 * 057 * ??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に 058 * 設定された接?Connection)を使用します? 059 * DBID は、Process_DBParam の -configFile で?す?DBConfig.xml ファイルを使用します? 060 * 061 * 引数??中にスペ?スを含??合?、ダブルコー??ション("") で括って下さ?? 062 * 引数??の ?』?前後には、スペ?スは挟めません。??key=value の様に 063 * 繋げてください? 064 * 065 * SQL?は、{@DATE.YMDH}等?シス?変数が使用できます? 066 * 067 * @og.formSample 068 * Process_BulkQuery -action=query -dbid=DBGE -sql="select KEY from TABLE_X" 069 * 070 * -action=処????) ??実行する??法を?しま? 071 * -action=query 単なるSQL?実行します? 072 * -action=bulkSet 実行したSQL??結果を?Set<String> オブジェクトに設定します? 073 * -action=minus Set<String> オブジェクトと、ここでの実行結果の差?とります? 074 * -action=intersect Set<String> オブジェクトと、ここでの実行結果の積?をとります? 075 * [ -dbid=DB接続ID ] ??-dbid=DBGE (? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? 076 * [ -sql=検索SQL? ] ??-sql="select * from GEA08" 077 * [ -sqlFile=検索SQLファイル ] ??-sqlFile=select.sql 078 * -sql= を指定しな??合?、ファイルで??してください? 079 * [ -sql_XXXX=固定? ] ??-sql_SYSTEM_ID=GE 080 * SQL?の{@XXXX}??を指定?固定?で置き換えます? 081 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE' 082 * [ -bulkKey=XXXX ] ??-bulkKey=XXXX 083 * SQL?の{@XXXX}??をProcess_BulkQuery等で取得した?で置き換えます? 084 * WHERE SYSTEM_ID IN ( {@XXXX} ) ?WHERE SYSTEM_ID IN ( 'AA','BB','CC' ) 085 * [ -bulkType=NUM|STR ] ??-bulType=STR 086 * Bulkの値を文字?に変換する場合に、数字型か??型を指定します? 087 * 数字型では、AA,BB,CC とし??型では?AA','BB','CC' に変換しま?初期値:STR)? 088 * [ -fetchSize=100 ] ?フェ?する行数(初期値:100) 089 * [ -display=false|true ] ?結果を標準?力に表示する(true)かしな?false)?初期値:false[表示しない]) 090 * [ -debug=false|true ] ?デバッグ??を標準?力に表示する(true)かしな?false)?初期値:false[表示しない]) 091 * 092 * @og.rev 5.3.4.0 (2011/04/01) 新規追? 093 * @version 4.0 094 * @author Kazuhiko Hasegawa 095 * @since JDK5.0, 096 */ 097 public class Process_BulkQuery extends AbstractProcess implements FirstProcess , ChainProcess { 098 private static final int MAX_BULK_SET = 500 ; // ORACLE の制? 1000 なので? 099 100 private static final String ACT_QUERY = "query" ; 101 private static final String ACT_BULKSET = "bulkSet" ; 102 private static final String ACT_MINUS = "minus" ; 103 private static final String ACT_INTERSECT = "intersect" ; 104 105 private static final String[] ACTION_LST = new String[] { ACT_QUERY,ACT_BULKSET,ACT_MINUS,ACT_INTERSECT }; 106 107 // private LineModel newData = null; 108 109 private String actionCmd = null; // SQL結果を加工(query:実行?minus:引き算?intersect:重??) 110 private String dbid = null; // メインDB接続ID 111 112 private String bulkKey = null; 113 private boolean bulkType = true; // true:STR , false:NUM 114 115 private int sqlCount = 0; // SQL??処?数 116 private int setCount = 0; // 取り出したSetの件数 117 private int outCount = 0; // マ?ジ後?Setの件数 118 119 private int fetchSize = 100; 120 private boolean display = false; // 表示しな? 121 private boolean debug = false; // ???? 122 private boolean firstTime = true; // ??の?目 123 124 private static final Map<String,String> mustProparty ; // ?プロパティ???チェ?用 Map 125 private static final Map<String,String> usableProparty ; // ?プロパティ?整合?チェ? Map 126 127 static { 128 mustProparty = new LinkedHashMap<String,String>(); 129 mustProparty.put( "action", "実行する??法を?します?(query|minus|intersect)" ); 130 131 usableProparty = new LinkedHashMap<String,String>(); 132 usableProparty.put( "dbid", "Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? ); 133 usableProparty.put( "sql", "検索SQL?sql or sqlFile ??)? \"select * from GEA08\"" ); 134 usableProparty.put( "sqlFile", "検索SQLファイル(sql or sqlFile ??)? select.sql" ); 135 usableProparty.put( "sql_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" + 136 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" ); 137 usableProparty.put( "dbid2", "DB接続ID2 ? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? ); 138 usableProparty.put( "sql2", "検索SQL?(sql or sqlFile ??)? \"select * from GEA08\"" ); 139 usableProparty.put( "sqlFile2", "検索SQLファイル2(sql or sqlFile ??)? select.sql" ); 140 usableProparty.put( "sql2_", "SQL?中の{@XXXX}??を指定?固定?で置き換えます?" + 141 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" ); 142 usableProparty.put( "bulkKey", "SQL?の{@XXXX}??をProcess_BulkQuery等で取得した?で置き換えます?" + 143 CR + "WHERE SYSTEM_ID IN ( {@XXXX} ) ?WHERE SYSTEM_ID IN ( 'AA','BB','CC' )" ); 144 usableProparty.put( "bulkType", "Bulkの値を文字?に変換する場合に、文字型か?数字型を指定します?" + 145 CR + "数字型では、AA,BB,CC とし??型では?AA','BB','CC' に変換します?(初期値:STR)" ); 146 usableProparty.put( "fetchSize","フェ?する行数 (初期値:100)" ); 147 usableProparty.put( "display", "結果を標準?力に表示する(true)かしな?false)? + 148 CR + "(初期値:false:表示しな?" ); 149 usableProparty.put( "debug", "????を標準?力に表示する(true)かしな?false)? + 150 CR + "(初期値:false:表示しな?" ); 151 } 152 153 /** 154 * ?ォルトコンストラクター? 155 * こ?クラスは、動??されます??ォルトコンストラクターで? 156 * super クラスに対して、?な初期化を行っておきます? 157 * 158 */ 159 public Process_BulkQuery() { 160 super( "org.opengion.fukurou.process.Process_BulkQuery",mustProparty,usableProparty ); 161 } 162 163 /** 164 * プロセスの初期化を行います?初めに??、呼び出されます? 165 * 初期処?ファイルオープン??オープン?に使用します? 166 * 167 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追? 168 * 169 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク? 170 */ 171 public void init( final ParamProcess paramProcess ) { 172 Argument arg = getArgument(); 173 174 actionCmd = arg.getProparty("action" , null , ACTION_LST ); 175 176 fetchSize = arg.getProparty("fetchSize",fetchSize); 177 display = arg.getProparty("display",display); 178 debug = arg.getProparty("debug",debug); 179 // if( debug ) { println( arg.toString() ); } // 5.7.3.0 (2014/02/07) ???? 180 181 dbid = arg.getProparty("dbid"); 182 String sql = arg.getFileProparty("sql","sqlFile",true); 183 if( debug ) { println( "入力SQL:" + sql ); } 184 185 HybsEntry[] entry =arg.getEntrys( "sql_" ); //配? 186 SystemParameter sysParam = new SystemParameter( sql ); 187 sql = sysParam.replace( entry ); 188 if( debug ) { println( "変換SQL:" + sql ); } 189 190 if( ACT_BULKSET.equalsIgnoreCase( actionCmd ) ) { 191 bulkKey = arg.getProparty("bulkKey"); 192 String bkType = arg.getProparty("bulkType"); 193 if( bkType != null ) { bulkType = bkType.equalsIgnoreCase( "STR" ); } 194 195 Set<String> setData = paramProcess.getBulkData(); 196 if( debug ) { println( setData.toString() ); } 197 setCount = setData.size(); 198 199 if( setCount > 0 ) { 200 // 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追? 201 // sql = makeBulkQuery( sql,bulkKey,bulkType,setData ); 202 // if( debug ) { println( "BulkSQL:" + sql ); } 203 // createSetData( paramProcess, dbid, sql ); 204 String[] sqls = makeBulkQuery( sql,bulkKey,bulkType,setData ); 205 for( int i=0; i<sqls.length; i++ ) { 206 if( debug ) { println( "BulkSQL:" + sqls[i] ); } 207 createSetData( paramProcess, dbid, sqls[i] ); 208 } 209 } 210 } 211 else if( ACT_QUERY.equalsIgnoreCase( actionCmd ) ) { 212 Set<String> setData2 = createSetData( paramProcess, dbid, sql ); 213 if( debug ) { println( setData2.toString() ); } 214 setCount = setData2.size(); 215 outCount = setCount; 216 paramProcess.setBulkData( setData2 ); 217 } 218 else { 219 Set<String> setData = paramProcess.getBulkData(); 220 Set<String> setData2 = createSetData( paramProcess, dbid, sql ); 221 setCount = setData2.size(); 222 223 if( ACT_MINUS.equalsIgnoreCase( actionCmd ) ) { 224 setData.removeAll( setData2 ); 225 } 226 else if( ACT_INTERSECT.equalsIgnoreCase( actionCmd ) ) { 227 setData.retainAll( setData2 ); 228 } 229 outCount = setData.size(); 230 if( debug ) { println( setData.toString() ); } 231 paramProcess.setBulkData( setData ); 232 } 233 } 234 235 /** 236 * プロセスの終?行います??に??、呼び出されます? 237 * 終???ファイルクローズ??クローズ?に使用します? 238 * 239 * @param isOK ト?タルで、OK?たかど? [true:成功/false:失敗] 240 */ 241 public void end( final boolean isOK ) { 242 // 何もありません? 243 } 244 245 /** 246 * こ???タの処?おいて、次の処?出来るかど?を問?わせます? 247 * こ?呼び出し1回毎に、次の??タを取得する準備を行います? 248 * 249 * @return 処?きる:true / 処?きな?false 250 */ 251 public boolean next() { 252 return firstTime; 253 } 254 255 /** 256 * 引数の LineModel を??るメソ?です? 257 * 変換処?? LineModel を返します? 258 * 後続??行わな?????タのフィルタリングを行う場?は? 259 * null ??タを返します?つまり?null ??タは、後続??行わな? 260 * フラグの代わりにも使用して?す? 261 * なお?変換処?? LineModel と、オリジナルの LineModel が? 262 * 同?、コピ?(クローン)か?、各処?ソ??決めて?す? 263 * ドキュメントに明記されて???合?、副作用が問題になる?合?? 264 * ???とに自?コピ?(クローン)して下さ?? 265 * 266 * @param data オリジナルのLineModel 267 * 268 * @return 処?換後?LineModel 269 */ 270 @SuppressWarnings(value={"unchecked"}) 271 public LineModel action( final LineModel data ) { 272 return data ; 273 } 274 275 /** 276 * ??に?行データである LineModel を作?しま? 277 * FirstProcess は、次?処?チェインして???の行データ? 278 * 作?して、後続? ChainProcess クラスに処?ータを渡します? 279 * 280 * @param rowNo 処?の行番号 281 * 282 * @return 処?換後?LineModel 283 */ 284 public LineModel makeLineModel( final int rowNo ) { 285 firstTime = false; // ?しか処?な?め?false を設定する? 286 287 LineModel model = new LineModel(); 288 289 model.setRowNo( rowNo ); 290 291 return model; 292 } 293 294 /** 295 * ?で使用する Set オブジェクトを作?します? 296 * Exception 以外では、? Set<String> オブジェクトを返します? 297 * 298 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追? 299 * 300 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク? 301 * @param dbid 接続?ID 302 * @param sql 実行するSQL?検索系) 303 * 304 * @return 実行結果から取り出した、最初?カラ??みを集めた Setオブジェク? 305 * @throws RuntimeException ??タベ?ス処?できなかった?合? 306 */ 307 private Set<String> createSetData( final ParamProcess paramProcess, final String dbid, final String sql ) { 308 Set<String> data = new HashSet<String>(); 309 310 Connection connection = null; 311 Statement stmt = null; 312 ResultSet resultSet = null; 313 314 try { 315 connection = paramProcess.getConnection( dbid ); 316 stmt = connection.createStatement(); 317 if( fetchSize > 0 ) { stmt.setFetchSize( fetchSize ); } 318 if( stmt.execute( sql ) ) { // true:検索系 , false:更新系 319 resultSet = stmt.getResultSet(); 320 while( resultSet.next() ) { 321 sqlCount++ ; 322 String str = resultSet.getString(1); 323 if( display ) { println( str ); } 324 data.add( str ); 325 } 326 } 327 else { 328 // sqlCount = stmt.getUpdateCount(); // 5.3.9.0 (2011/09/01) 329 sqlCount += stmt.getUpdateCount(); 330 } 331 } 332 catch (SQLException ex) { 333 String errMsg = "SQL を実行できませんでした? + CR 334 + "errMsg=[" + ex.getMessage() + "]" + CR 335 + "errorCode=[" + ex.getErrorCode() + "] State=[" + ex.getSQLState() + "]" + CR 336 + "DBID=" + dbid + CR 337 + "SQL =" + sql ; 338 339 throw new RuntimeException( errMsg,ex ); 340 } 341 finally { 342 Closer.resultClose( resultSet ); 343 Closer.stmtClose( stmt ); 344 345 ConnectionFactory.remove( connection,dbid ); 346 } 347 return data; 348 } 349 350 /** 351 * ?で使用する Set オブジェクトを作?します? 352 * Exception 以外では、? Set<String[]> オブジェクトを返します? 353 * 354 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追? 355 * 356 * @param sql オリジナルのSQL? 357 * @param bulkKey ?処?置き換えるキー?? 358 * @param bulkType ?型(true)か?数字型(false)を指? 359 * @param setData ?処???なるSetオブジェク? 360 * 361 * @return オリジナルのSQL?に ?処????と置換したSQL??配? 362 */ 363 private String[] makeBulkQuery( final String sql, final String bulkKey, final boolean bulkType,final Set<String> setData ) { 364 String[] sqls = new String[ (setData.size()/MAX_BULK_SET) + 1 ]; 365 int idx = 0; 366 int cnt = 0; 367 368 StringBuilder buf = new StringBuilder(); 369 String bulkVal = null; 370 if( bulkType ) { // ??の場? 371 for( String key : setData ) { 372 cnt++; 373 buf.append( ",'" ).append( key ).append( "'" ); 374 if( cnt >= MAX_BULK_SET ) { 375 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 376 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 377 cnt = 0; 378 buf = new StringBuilder(); 379 } 380 } 381 if( cnt > 0 ) { // きっちりで終わらな??? 382 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 383 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 384 } 385 } 386 else { // 数字?場? 387 for( String key : setData ) { 388 cnt++; 389 buf.append( "," ).append( key ); 390 if( cnt >= MAX_BULK_SET ) { 391 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 392 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 393 cnt = 0; 394 buf = new StringBuilder(); 395 } 396 } 397 if( cnt > 0 ) { // きっちりで終わらな??? 398 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 399 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 400 } 401 } 402 // String bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす 403 404 // return sql.replace( "{@" + bulkKey + "}" ,bulkVal ); 405 return sqls; 406 } 407 408 /** 409 * プロセスの処?果のレポ?ト表現を返します? 410 * 処??ログラ?、?力件数、?力件数などの??です? 411 * こ???をそのまま、標準?力に出すことで、結果レポ?トと出来るよ? 412 * 形式で出してください? 413 * 414 * @return 処?果のレポ?? 415 */ 416 public String report() { 417 String report = "[" + getClass().getName() + "]" + CR 418 + TAB + "Action : " + actionCmd + CR 419 + TAB + "DBID : " + dbid + CR 420 + TAB + "sqlCount : " + sqlCount + CR 421 + TAB + "setCount : " + setCount + CR 422 + TAB + "outCount : " + outCount ; 423 424 return report ; 425 } 426 427 /** 428 * こ?クラスの使用方法を返します? 429 * 430 * @return こ?クラスの使用方? 431 */ 432 public String usage() { 433 StringBuilder buf = new StringBuilder(); 434 435 buf.append( "Process_BulkQueryは、データベ?スから読み取った?容を??処?るために? ).append( CR ); 436 buf.append( "ParamProcess のサブクラス(Process_DBParam)にセ?したり??したりす? ).append( CR ); 437 buf.append( "FirstProcess と、ChainProcess のインターフェースを両方持った?実?ラスです?" ).append( CR ); 438 buf.append( CR ); 439 buf.append( "こ?クラスは、上流から?下流への処???度しか実行されません? ).append( CR ); 440 buf.append( "FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します?" ).append( CR ); 441 buf.append( "ChainProcess は、その結果を取り?し?自??身の処?果と合せて?します?" ).append( CR ); 442 buf.append( CR ); 443 buf.append( "FirstProcess では?action は、query のみです?" ).append( CR ); 444 buf.append( " query は、指定?SQL?実行し、結果のSetをParamProcessに設定します?" ).append( CR ); 445 buf.append( "ChainProcess では?action は、query、bulkSet、minus、intersect が指定できます?" ).append( CR ); 446 buf.append( " query は、上記と同じです?" ).append( CR ); 447 buf.append( " minus は、?のSetから、SQL??実行結果を引き算し、結果Setを?設定します?" ).append( CR ); 448 buf.append( " intersect は、?のSetから、SQL??実行結果と重?る結果Setを?設定します?" ).append( CR ); 449 buf.append( " bulkSet は、?のSetを取り?し?SQL??して処?ます?" ).append( CR ); 450 buf.append( CR ); 451 buf.append( "流れ?は、query で検索し?minusまた?intersect でSetオブジェクトを?し?" ).append( CR ); 452 buf.append( "bulkSet で利用します?例えば、ORACLEから、ユニ?クキーのSetを作?し?" ).append( CR ); 453 buf.append( "SQLServerのユニ?クキーをminusした結果を?ORACLEからDELETEすれば、不要な" ).append( CR ); 454 buf.append( "??タを削除するなどの処?実行可能になります?また?単純に、query ?を?" ).append( CR ); 455 buf.append( "チェインすれば、単発のUPDATE?実行することが可能です?" ).append( CR ); 456 buf.append( CR ); 457 buf.append( "??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR ); 458 buf.append( "設定された接?Connection)を使用します?" ).append( CR ); 459 buf.append( CR ); 460 buf.append( "引数??中に空白を含??合?、ダブルコー??ション(\"\") で括って下さ??" ).append( CR ); 461 buf.append( "引数??の ?』?前後には、空白は挟めません。??key=value の様に" ).append( CR ); 462 buf.append( "繋げてください? ).append( CR ); 463 buf.append( CR ); 464 buf.append( "SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?" ).append( CR ); 465 buf.append( CR ).append( CR ); 466 467 buf.append( getArgument().usage() ).append( CR ); 468 469 return buf.toString(); 470 } 471 472 /** 473 * こ?クラスは、main メソ?から実行できません? 474 * 475 * @param args コマンド引数配? 476 */ 477 public static void main( final String[] args ) { 478 LogWriter.log( new Process_BulkQuery().usage() ); 479 } 480 }