001 /*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016 package org.opengion.fukurou.process;
017
018 import org.opengion.fukurou.util.Argument;
019 import org.opengion.fukurou.util.SystemParameter;
020 import org.opengion.fukurou.util.StringUtil;
021 import org.opengion.fukurou.util.LogWriter;
022 import org.opengion.fukurou.util.HybsEntry ;
023 import org.opengion.fukurou.util.Closer;
024 import org.opengion.fukurou.model.Formatter;
025 import org.opengion.fukurou.db.ConnectionFactory;
026
027 import java.util.Map ;
028 import java.util.LinkedHashMap ;
029 import java.util.Set ;
030 import java.util.HashSet ;
031
032 import java.sql.Connection;
033 import java.sql.PreparedStatement;
034 import java.sql.ParameterMetaData;
035 import java.sql.SQLException;
036
037 /**
038 * Process_DBWriter は、上流から受け取ったデータをデータベ?スに書き込?
039 * CainProcess インターフェースの実?ラスです?
040 *
041 * 上?プロセスチェインの??タは上流から下流へと渡されます?)から受け取っ?
042 * LineModel を?に、データベ?スへの書き込みを行います?
043 *
044 * ??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に
045 * 設定された接?Connection)を使用します?
046 *
047 * 引数??中にスペ?スを含??合?、ダブルコー??ション("") で括って下さ??
048 * 引数??の ?』?前後には、スペ?スは挟めません。??key=value の様に
049 * 繋げてください?
050 *
051 * SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?
052 *
053 * @og.formSample
054 * Process_DBWriter -dbid=DBGE -table=GE41
055 *
056 * [ -dbid=DB接続ID ] ??-dbid=DBGE (? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規?
057 * [ -table=登録??ブルID ] ???????する?合?不要?INSERT する場合???ブルID
058 * [ -sql=検索SQL? ] ??-sql="UPDATE GE41 SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME]
059 * WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]"
060 * [ -sqlFile=登録SQLファイル ] ??-sqlFile=update.sql
061 * ?? -sql ?-sqlFile が指定されな??合??table で????ブルに全カラ?insert です?
062 * [ -sql_XXXX=固定? ] ??-sql_SYSTEM_ID=GE
063 * SQL?の{@XXXX}??を指定?固定?で置き換えます?
064 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'
065 * [ -const_XXXX=固定? ] ??-const_FGJ=1
066 * LineModel のキー(const_ に続く??)の値に、固定?を設定します?
067 * キーが異なれ?、?のカラ?を指定できます?
068 * [ -omitClms=AAA,BBB,… ] ??-omitClms=UNIQ,FGJ,DYSET
069 * -table 属?でINSERT?自動作?する場合?取り除くカラ??
070 * カンマ区?で??できます?
071 * [ -commitCnt=commit処?定] ???数毎にコミットを発行します?0 の場合?、終?でコミットしません?
072 * [ -display=false|true ] ??結果を標準?力に表示する(true)かしな?false)?初期値:false[表示しない])
073 *
074 * @version 4.0
075 * @author Kazuhiko Hasegawa
076 * @since JDK5.0,
077 */
078 public class Process_DBWriter extends AbstractProcess implements ChainProcess {
079 private static final String CNST_KEY = "const_" ;
080 private static final String SQL_KEY = "sql_" ;
081
082 private Connection connection = null;
083 private PreparedStatement pstmt = null;
084 private ParameterMetaData pMeta = null; // 5.1.1.0 (2009/11/11) setObject に、Type を渡す?(PostgreSQL対?
085 private boolean useParamMetaData = false; // 5.1.1.0 (2009/11/11) setObject に、Type を渡す?(PostgreSQL対?
086
087 private String dbid = null;
088 private String sql = null;
089 private String table = null;
090 private int[] clmNos = null; // ファイルのヘッ??のカラ?号
091 private int commitCnt = 0; // コミットするまとめ件数
092 private boolean display = false; // 表示しな?
093
094 private String[] cnstClm = null; // 固定?を設定するカラ?
095 private int[] cnstClmNos = null; // 固定?を設定するカラ?号
096 private String[] constVal = null; // カラ?号に対応した固定?
097
098 private boolean firstRow = true; // ??の?目
099 private int count = 0;
100 private String[] omitClms = null; // 4.0.0.0 (2007/09/21) table ?時に取り除くカラ?
101
102 private static final Map<String,String> mustProparty ; // ?プロパティ???チェ?用 Map
103 private static final Map<String,String> usableProparty ; // ?プロパティ?整合?チェ? Map
104
105 static {
106 mustProparty = new LinkedHashMap<String,String>();
107
108 usableProparty = new LinkedHashMap<String,String>();
109 usableProparty.put( "dbid", "Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? );
110 usableProparty.put( "table", "INSERT する場合???ブルID SQL??する?合?不要?" );
111 usableProparty.put( "sql", "更新SQL?sql or sqlFile ??)" +
112 CR + "? \"UPDATE GE41 " +
113 CR + "SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME] " +
114 CR + "WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]\"" );
115 usableProparty.put( "sqlFile", "登録SQLファイル(sql or sqlFile ??)? update.sql" );
116 usableProparty.put( "sql_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" +
117 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" );
118 usableProparty.put( "const_", "LineModel のキー(const_ に続く??)の値に、固定?? +
119 CR + "設定します?キーが異なれ?、?のカラ?を指定できます?" +
120 CR + "? -sql_SYSTEM_ID=GE" );
121 // 4.0.0.0 (2007/09/21) 属?を追?
122 usableProparty.put( "omitClms", "-table 属?でINSERT?自動作?する場合?取り除くカラ?? +
123 CR + "カンマ区?で??できます?" +
124 CR + "? -omitClms=UNIQ,FGJ,DYSET" );
125 usableProparty.put( "commitCnt", "?数毎にコミットを発行します?" +
126 CR + "0 の場合?、終?でコミットしません(初期値:0)" );
127 usableProparty.put( "display", "結果を標準?力に表示する(true)かしな?false)? +
128 CR + "(初期値:false:表示しな?" );
129 }
130
131 /**
132 * ?ォルトコンストラクター?
133 * こ?クラスは、動??されます??ォルトコンストラクターで?
134 * super クラスに対して、?な初期化を行っておきます?
135 *
136 */
137 public Process_DBWriter() {
138 super( "org.opengion.fukurou.process.Process_DBWriter",mustProparty,usableProparty );
139 }
140
141 /**
142 * プロセスの初期化を行います?初めに??、呼び出されます?
143 * 初期処?ファイルオープン??オープン?に使用します?
144 *
145 * @og.rev 4.0.0.0 (2007/09/21) omitClms 属?を追?
146 * @og.rev 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
147 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData ?ConnectionFactory経由で取得?(PostgreSQL対?
148 *
149 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク?
150 */
151 public void init( final ParamProcess paramProcess ) {
152 Argument arg = getArgument();
153
154 table = arg.getProparty("table");
155 sql = arg.getFileProparty("sql","sqlFile",false);
156 commitCnt = arg.getProparty("commitCnt",commitCnt);
157 display = arg.getProparty("display",display);
158
159 dbid = arg.getProparty("dbid");
160 connection = paramProcess.getConnection( dbid );
161 // 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
162 // useParamMetaData = ApplicationInfo.useParameterMetaData( connection );
163 useParamMetaData = ConnectionFactory.useParameterMetaData( dbid ); // 5.3.8.0 (2011/08/01)
164
165 // 取り除くカラ?リストを配?に変換します?
166 String tempClms = arg.getProparty("omitClms",null);
167 if( tempClms != null ) {
168 omitClms = StringUtil.csv2Array( tempClms );
169 }
170
171 if( sql == null && table == null ) {
172 String errMsg = "sql を指定しな??合?、table を??してください?;
173 throw new RuntimeException( errMsg );
174 }
175
176 // 3.8.0.1 (2005/06/17) {@DATE.XXXX} 変換処??追?
177 // {@DATE.YMDH} などの??を?yyyyMMddHHmmss 型?日付に置き換えます?
178 // SQL?? {@XXXX} ??の固定?への置き換?
179 HybsEntry[] entry =arg.getEntrys(SQL_KEY); // 配?
180 SystemParameter sysParam = new SystemParameter( sql );
181 sql = sysParam.replace( entry );
182
183 HybsEntry[] cnstKey = arg.getEntrys( CNST_KEY ); // 配?
184 int csize = cnstKey.length;
185 cnstClm = new String[csize];
186 constVal = new String[csize];
187 for( int i=0; i<csize; i++ ) {
188 cnstClm[i] = cnstKey[i].getKey();
189 constVal[i] = cnstKey[i].getValue();
190 }
191 }
192
193 /**
194 * プロセスの終?行います??に??、呼び出されます?
195 * 終???ファイルクローズ??クローズ?に使用します?
196 *
197 * @og.rev 4.0.0.0 (2007/11/27) commit,rollback,remove 処?追?
198 * @og.rev 5.1.1.0 (2009/11/11) pMeta のクリア
199 *
200 * @param isOK ト?タルで、OK?たかど?[true:成功/false:失敗]
201 */
202 public void end( final boolean isOK ) {
203 boolean flag = Closer.stmtClose( pstmt );
204 pstmt = null;
205 pMeta = null; // 5.1.1.0 (2009/11/11)
206
207 if( isOK ) {
208 Closer.commit( connection );
209 }
210 else {
211 Closer.rollback( connection );
212 }
213 ConnectionFactory.remove( connection,dbid );
214
215 if( !flag ) {
216 String errMsg = "ス??トメントをクローズ出来ません?;
217 throw new RuntimeException( errMsg );
218 }
219 }
220
221 /**
222 * 引数の LineModel を??るメソ?です?
223 * 変換処?? LineModel を返します?
224 * 後続??行わな?????タのフィルタリングを行う場?は?
225 * null ??タを返します?つまり?null ??タは、後続??行わな?
226 * フラグの代わりにも使用して?す?
227 * なお?変換処?? LineModel と、オリジナルの LineModel が?
228 * 同?、コピ?(クローン)か?、各処?ソ??決めて?す?
229 * ドキュメントに明記されて???合?、副作用が問題になる?合??
230 * ???とに自?コピ?(クローン)して下さ??
231 *
232 * @og.rev 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
233 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData setNull 対?PostgreSQL対?
234 *
235 * @param data オリジナルのLineModel
236 *
237 * @return 処?換後?LineModel
238 */
239 public LineModel action( final LineModel data ) {
240 count++ ;
241 // if( display ) { println( data.dataLine() ); }
242 try {
243 if( firstRow ) {
244 pstmt = makePrepareStatement( table,data );
245 // 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
246 if( useParamMetaData ) {
247 pMeta = pstmt.getParameterMetaData();
248 }
249
250 int size = cnstClm.length;
251 cnstClmNos = new int[size];
252 for( int i=0; i<size; i++ ) {
253 cnstClmNos[i] = data.getColumnNo( cnstClm[i] );
254 }
255
256 firstRow = false;
257 }
258
259 // 固定?置き換え??
260 for( int j=0; j<cnstClmNos.length; j++ ) {
261 data.setValue( cnstClmNos[j],constVal[j] );
262 }
263
264 // 5.1.1.0 (2009/11/11) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
265 if( useParamMetaData ) {
266 for( int i=0; i<clmNos.length; i++ ) {
267 int type = pMeta.getParameterType( i+1 );
268 // 5.3.8.0 (2011/08/01) setNull 対?
269 // pstmt.setObject( i+1,data.getValue(clmNos[i]),type );
270 Object val = data.getValue(clmNos[i]);
271 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) {
272 pstmt.setNull( i+1, type );
273 }
274 else {
275 pstmt.setObject( i+1, val, type );
276 }
277 }
278 }
279 else {
280 for( int i=0; i<clmNos.length; i++ ) {
281 pstmt.setObject( i+1,data.getValue(clmNos[i]) );
282 }
283 }
284
285 pstmt.execute();
286 if( commitCnt > 0 && ( count%commitCnt == 0 ) ) {
287 Closer.commit( connection );
288 }
289 }
290 catch (SQLException ex) {
291 String errMsg = "sql=[" + sql + "]" + CR +
292 "errorCode=[" + ex.getErrorCode() + "] State=[" +
293 ex.getSQLState() + "]" + CR ;
294 throw new RuntimeException( errMsg,ex );
295 }
296 if( display ) { println( data.dataLine() ); } // 5.1.2.0 (2010/01/01) display の条件変更
297 return data;
298 }
299
300 /**
301 * ?で使用する PreparedStatement を作?します?
302 * 引数?? SQL また?、LineModel から作?した SQL より構築します?
303 *
304 * @og.rev 4.0.0.0 (2007/09/21) omitClms 属?を追?
305 *
306 * @param table 処?象の??ブルID
307 * @param data 処?象のLineModel
308 *
309 * @return PreparedStatementオブジェク?
310 */
311 private PreparedStatement makePrepareStatement( final String table,final LineModel data ) {
312 if( sql == null ) {
313 StringBuilder buf = new StringBuilder();
314 String[] names = data.getNames();
315
316 // カラ?取り除く??
317 if( omitClms != null ) {
318 Set<String> set = new HashSet<String>();
319 for( int i=0; i<names.length; i++ ) {
320 set.add( names[i] );
321 }
322 for( int i=0; i<omitClms.length; i++ ) {
323 set.remove( omitClms[i] );
324 }
325 names = set.toArray( new String[set.size()] );
326 }
327 int size = names.length;
328
329 buf.append( "INSERT INTO " ).append( table ).append( " (" );
330 buf.append( names[0] );
331 for( int i=1; i<size; i++ ) {
332 buf.append( "," ).append( names[i] );
333 }
334 buf.append( " ) VALUES ( ?" );
335 for( int i=1; i<size; i++ ) {
336 buf.append( ",?" );
337 }
338 buf.append( " )" );
339 sql = buf.toString();
340
341 // カラ?号を設定します?
342 clmNos = new int[size];
343 for( int i=0; i<size; i++ ) {
344 clmNos[i] = data.getColumnNo( names[i] ); // 4.0.0.0 (2007/09/21)
345 }
346 }
347 else {
348 Formatter format = new Formatter( data );
349 format.setFormat( sql );
350 sql = format.getQueryFormatString();
351 clmNos = format.getClmNos();
352 }
353
354 final PreparedStatement ps ;
355 try {
356 ps = connection.prepareStatement( sql );
357 }
358 catch (SQLException ex) {
359 String errMsg = "PreparedStatement を取得できませんでした? + CR
360 + "sql=[" + sql + "]" + CR
361 + "table=[" + table + "]" + CR
362 + "nameLine=[" + data.nameLine() + "]" ;
363 throw new RuntimeException( errMsg,ex );
364 }
365
366 return ps;
367 }
368
369 /**
370 * プロセスの処?果のレポ?ト表現を返します?
371 * 処??ログラ?、?力件数、?力件数などの??です?
372 * こ???をそのまま、標準?力に出すことで、結果レポ?トと出来るよ?
373 * 形式で出してください?
374 *
375 * @return 処?果のレポ??
376 */
377 public String report() {
378 String report = "[" + getClass().getName() + "]" + CR
379 + TAB + "DBID : " + dbid + CR
380 + TAB + "Output Count : " + count ;
381
382 return report ;
383 }
384
385 /**
386 * こ?クラスの使用方法を返します?
387 *
388 * @return こ?クラスの使用方?
389 */
390 public String usage() {
391 StringBuilder buf = new StringBuilder();
392
393 buf.append( "Process_DBWriter は、上流から受け取ったデータをデータベ?スに書き込? ).append( CR );
394 buf.append( "CainProcess インターフェースの実?ラスです?" ).append( CR );
395 buf.append( CR );
396 buf.append( "上?プロセスチェインの??タは上流から下流へと渡されます?)から" ).append( CR );
397 buf.append( "受け取っ?LineModel を?に、データベ?スへの書き込みを行います?" ).append( CR );
398 buf.append( CR );
399 buf.append( "??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR );
400 buf.append( "設定された接?Connection)を使用します?" ).append( CR );
401 buf.append( CR );
402 buf.append( "引数??中に空白を含??合?、ダブルコー??ション(\"\") で括って下さ??" ).append( CR );
403 buf.append( "引数??の ?』?前後には、空白は挟めません。??key=value の様に" ).append( CR );
404 buf.append( "繋げてください? ).append( CR );
405 buf.append( CR );
406 buf.append( "SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?" ).append( CR );
407 buf.append( CR ).append( CR );
408 buf.append( getArgument().usage() ).append( CR );
409
410 return buf.toString();
411 }
412
413 /**
414 * こ?クラスは、main メソ?から実行できません?
415 *
416 * @param args コマンド引数配?
417 */
418 public static void main( final String[] args ) {
419 LogWriter.log( new Process_DBWriter().usage() );
420 }
421 }