001package org.opengion.plugin.daemon; 002 003import java.util.Date; 004 005import javax.jms.QueueSession; 006 007// import org.hsqldb.lib.StringUtil; 008import org.opengion.fukurou.util.StringUtil; // 7.0.6.0 (2019/10/07) 009import org.opengion.fukurou.queue.QueueInfo; 010import org.opengion.fukurou.queue.QueueSend; 011import org.opengion.fukurou.queue.QueueSendFactory; 012import org.opengion.fukurou.util.HybsTimerTask; 013import org.opengion.hayabusa.common.HybsSystem; 014import org.opengion.hayabusa.queue.DBAccessQueue; 015 016/** 017 * メッセージキュー送信 018 * メッセージキュー送信テーブルを監視して、 019 * 送信処理を行います。 020 * 021 * @og.group メッセージ連携 022 * 023 * @og.rev 5.10.15.0 (2019/08/30) 新規作成 024 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動 025 * 026 * @version 5.0 027 * @author oota 028 * @since JDK7 029 * 030 */ 031public class Daemon_QueueSend extends HybsTimerTask { 032 private int loopCnt = 0; 033 private static final int LOOP_COUNTER = 24; 034 private QueueSend queueSend; 035 036 private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 037 private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" ); 038 private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" ); 039 private final String USER_ID = "CYYYYY"; 040 private final String PG_ID = "DMN_QueSnd"; 041 private final String DMN_NAME = "QueueReceiveDMN"; 042 private final DBAccessQueue dbAccessQueue; 043 044 /** 045 * コンストラクター 046 * 初期処理を行います。 047 */ 048 public Daemon_QueueSend(){ 049 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 050 } 051 /** 052 * 開始処理 053 * タイマータスクのデーモン処理の開始ポイントです。 054 */ 055 @Override 056 protected void startDaemon() { 057 if (loopCnt % LOOP_COUNTER == 0) { 058 loopCnt = 1; 059 System.out.println(); 060 System.out.println(toString() + " " + new Date() + ""); 061 } else { 062 // メッセージキュー送信管理テーブルから、送信対象のレコードを取得 063 final String[][] vals = dbAccessQueue.selectGE65(); 064 065 // 取得データ分の繰り返し処理を実行する 066 for(int i = 0; i < vals.length; i++) { 067 final String[] record = vals[i]; 068 069 // GE65から取得した値を変数に格納 070 final String ykno = record[0]; 071 final String queueId = record[1]; 072 final String message = record[2]; 073 final String dedupliId = record[3]; 074 final String queSyu = record[4]; 075 final String jmsUrl = record[5]; 076 077 final String queueType = queSyu.toUpperCase(); 078 queueSend = QueueSendFactory.newQueueSend(queueType); 079 080 // 接続処理 081 queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 082 083 // メッセージ送信管理テーブルから取得したデータを送信実装予定 084 final QueueInfo queueInfo = new QueueInfo(); 085 086 // 応答確認種別 087 if("MQ".equals(queueType)){ 088 // MQメッセージサーバ指定時 089 queueInfo.setMqTransacted(false); 090 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 091 // キュー名 092 queueInfo.setMqQueueName(queueId); 093 }else if("SQS".equals(queueType)){ 094 // SQSメッセージサーバ指定時 095 // グループID 096 queueInfo.setSqsFifoGroupId(queueId); 097 if(!StringUtil.isEmpty(dedupliId)) { 098 // 重複排除ID 099 // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる) 100 queueInfo.setSqsFifoDedupliId(dedupliId); 101 } 102 } 103 104 // メッセージ 105 queueInfo.setMessage(message); 106 107 // 完了フラグを処理中:2に更新 108 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS); 109 110 // メッセージ送信処理 111 try{ 112 queueSend.sendMessage(queueInfo); 113 114 // 完了フラグを完了:3に更新 115 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END); 116 117 }catch(Exception e) { 118 // 完了フラグをエラー:4に更新して、エラー情報を登録 119 dbAccessQueue.updateGE66Error(ykno, e.getMessage()); 120 } 121 } 122 123 // クローズ処理 124 queueSend.close(); 125 126 loopCnt++; 127 } 128 } 129}