Servus miteinander,
ich bin neu in dem Forum. Ich schreibe gerade an meiner Abschlussarbeit und hänge seit einiger Zeit an einem Problem.
Meine Aufgabe ist es, ein 'Fraud Detection' Framework für eine Telekommunikations Unternehmung zu erstellen.
Das Framework liest alle Telefontickets ein (Ticket = informationen zu einem Telefonat) und speichert diese in einer Datenbank via Hibernate als Objekt ab.
Dieser Vorgang arbeitet als eigener Thread.
Weiter sollen in das Framework Module eingesetzt werden, die nach Betrugsmuster prüfen.
Diese Module werden von einem Scheduler angestoßen (auch ein eigener Thread).
Das Framework soll permanent laufen.
Problemstellung:
Ich lasse das Framework momentan übernacht (oder länger) laufen um es zu testen. Dabei ist mir folgendes aufgefallen.
Der Verbrauch des 'Virtuellen Speichers' klettert unaufhörlich in die Höhe. Nach einer Nacht reserviert mein Programm 20GB Virtuellen Speicher im System.
Nach 3 Nächten 70GB Virtueller Speicher.
Der Heap Speicher Verbrauch bleibt dabei relativ konstant bei ca. 100MB - 200MB.
Beobachtet mit 'Jconsole' und 'htop' bzw 'top'.
Das Programm läuft auf einer Debian6 VM mit einer 10GB Partition, 500MB RAM und 500MB Swap.
-> Da stellt sich mir schon mal die Frage, wo nimmt die JavaVM den Speicher überhaupt her?
Ich vermute das Problem hängt mit dem einlesen der Tickets (FileStreamerThread.java) oder mit der Hibernate DB Anbindung zusammen...
Leider bin ich langsam mit meinem Latein am Ende...
Würde mich über jedern Tipp oder Rat freuen!!
Der Code ist leider etwas unleserlich, ich bitte das zu entschuldigen.
Klasse: FileStreamerThread.java (liest die Tickets ein)
Klasse: Scheduler.java (Startet die Module)
Klasse: HibernateUtil.java ( Steuert die DB Verbindung )
Klasse: FraudRepository.java (Stellt Funktionen für DB Zugriff zur Verfügung)
ich bin neu in dem Forum. Ich schreibe gerade an meiner Abschlussarbeit und hänge seit einiger Zeit an einem Problem.
Meine Aufgabe ist es, ein 'Fraud Detection' Framework für eine Telekommunikations Unternehmung zu erstellen.
Das Framework liest alle Telefontickets ein (Ticket = informationen zu einem Telefonat) und speichert diese in einer Datenbank via Hibernate als Objekt ab.
Dieser Vorgang arbeitet als eigener Thread.
Weiter sollen in das Framework Module eingesetzt werden, die nach Betrugsmuster prüfen.
Diese Module werden von einem Scheduler angestoßen (auch ein eigener Thread).
Das Framework soll permanent laufen.
Problemstellung:
Ich lasse das Framework momentan übernacht (oder länger) laufen um es zu testen. Dabei ist mir folgendes aufgefallen.
Der Verbrauch des 'Virtuellen Speichers' klettert unaufhörlich in die Höhe. Nach einer Nacht reserviert mein Programm 20GB Virtuellen Speicher im System.
Nach 3 Nächten 70GB Virtueller Speicher.
Der Heap Speicher Verbrauch bleibt dabei relativ konstant bei ca. 100MB - 200MB.
Beobachtet mit 'Jconsole' und 'htop' bzw 'top'.
Das Programm läuft auf einer Debian6 VM mit einer 10GB Partition, 500MB RAM und 500MB Swap.
-> Da stellt sich mir schon mal die Frage, wo nimmt die JavaVM den Speicher überhaupt her?
Ich vermute das Problem hängt mit dem einlesen der Tickets (FileStreamerThread.java) oder mit der Hibernate DB Anbindung zusammen...
Leider bin ich langsam mit meinem Latein am Ende...
Würde mich über jedern Tipp oder Rat freuen!!
Der Code ist leider etwas unleserlich, ich bitte das zu entschuldigen.
Klasse: FileStreamerThread.java (liest die Tickets ein)
Java:
public class FileStreamerThread extends Thread {
private static Logger logger = Logger.getLogger(FileStreamerThread.class);
private static SystemProperties sysProps = SystemProperties.getInstance();
private Framework framework = new Framework();
private static FileStreamerThread instance = null;
/** Befehl zum auslesen des System-Änderungsdatums einer Datei. */
String[] cmd = {
"bash",
"-c",
"ls -l " + sysProps.getTicketfile().getAbsolutePath()
+ " | grep grnti | awk \'{print $8}\'" };
/** Flag zum sauberen beenden des Threads */
volatile boolean stop = false;
/** Daten fuer das Input File */
FileInputStream fis = null;
DataInputStream dis = null;
BufferedReader br = null;
File file = sysProps.getTicketfile();
private String file_age = null;
/** SINGELTON! */
public static FileStreamerThread getInstance() {
if (instance == null)
instance = new FileStreamerThread();
return instance;
}
/**
* Wird benötigt, wenn der FileStream Thread vom Benutzer via Frontend neu
* gestartet wird.
*/
public void destroyFileStreamerThread() {
instance = null;
}
/**
* Inititalisiert den Dateizugriff
*
* @return true wenn erfolgreich
*/
public boolean setupReader() {
try {
fis = new FileInputStream(file);
dis = new DataInputStream(fis);
br = new BufferedReader(new InputStreamReader(dis));
try {
Process proc = Runtime.getRuntime().exec(cmd);
BufferedReader in = new BufferedReader(new InputStreamReader(
proc.getInputStream()));
file_age = in.readLine();
logger.info("Fileage set to: " + file_age);
} catch (IOException e) {
logger.warn("Could not set FileAge!");
return false;
}
return true;
} catch (FileNotFoundException e1) {
logger.error("Konnte die TicketDatei \'"
+ sysProps.getTicketfile().getAbsolutePath()
+ "\' nicht öffnen.");
if (!file.exists())
logger.error("Die TicketDatei \'"
+ sysProps.getTicketfile().getAbsolutePath()
+ "\' existiert nicht.");
return false;
}
}
/**
* Schließt den Dateizugriff
*/
public void shutdownReader() {
try {
br.close();
dis.close();
fis.close();
br = null;
dis = null;
fis = null;
} catch (IOException e) {
logger.error("Die Ticketdatei \'"
+ sysProps.getTicketfile().getAbsolutePath()
+ "\' konnte nicht geschlossen werden.");
}
}
/**
* Endlosschleife: Läuft als eigener Thread und überwacht/liest ständigt die
* Ticketdatei.
*
*/
public void run() {
/** buffert die einzelnen Elemente eines Tickets */
String[] curTickElements = null;
/** beinhaltet das letzte Element im Ticket */
int finalField = -1;
/** Ticket als String in Rohform */
String str = "";
if (setupReader()) {
/** FileZugriff erfolgreich... */
System.out.println("Starting Filestream.");
while (!stop) {
try {
if ((str = br.readLine()) != null) {
/** Trennt das Ticket in die einzelnen Elemente auf */
curTickElements = str.split(" ");
/** Definiert die Länge es Tickets */
finalField = curTickElements.length - 1;
if (curTickElements[0].contains("GO")) {
/** Aus Ausgehende Tickets sollen behandelt werden */
framework.cutRawTicket(finalField, curTickElements);
}
}
} catch (ArrayIndexOutOfBoundsException ae) {
logger
.debug("Ticket konnte nicht verarbeitet werden. Zu wenig Felder. ["
+ finalField + " Felder] \n" + str);
} catch (IOException e) {
logger.error("Fehler bei Zugriff auf die Ticketdatei \'"
+ sysProps.getTicketfile() + "\'.");
} catch (Exception e) {
logger
.debug("Unbekannter Fehler beim Verarbeiten des Tickets \'"
+ str
+ "\'. ("
+ finalField
+ ")\n"
+ e.toString() + "\n");
}
/**
* Erneuerung des Dateizugriffs (Ticketdatei). Ist der
* Filestream NULL und der thread aktiv (!stop), wird geprüft ob
* sich die erstellungszeit der Datei verändert hat 'file_age'.
* Wenn ja, erfolgt eine Erneuerung.
*
*/
try {
if ((!br.ready()) && (!stop)) {
try {
Process proc = Runtime.getRuntime().exec(cmd);
BufferedReader in = new BufferedReader(
new InputStreamReader(proc.getInputStream()));
String current_file_Age = "X"; // Init variable
current_file_Age = in.readLine();
if (!file_age.contains(current_file_Age)) {
logger.info(file_age + " != "
+ current_file_Age);
setupReader();
logger
.info("Zugriff auf die Ticketdatei erfolgreich erneuert \'"
+ sysProps.getTicketfile()
.getAbsolutePath()
+ "\'");
}
} catch (NullPointerException npe) {
logger
.error("Das Dateialter der Ticketdatei konnte nicht ermittelt werden. NullpointerException!\nDateizugriff wird erneuert");
setupReader();
}
catch (IOException e) {
logger
.error("Das Dateialter der Ticketdatei konnte nicht ermittelt werden. IOException!\nDateizugriff wird erneuert");
setupReader();
}
}
} catch (IOException e) {
logger
.error("Schwerwiegender Fehler beim Erneuern des Zugriffs auf die Ticketdatei \'"
+ sysProps.getTicketfile()
.getAbsolutePath()
+ "\'\n"
+ e.toString());
setupReader();
}
}
}
}
}
Klasse: Scheduler.java (Startet die Module)
Java:
public class Scheduler extends Thread {
private static Scheduler instance = null;
private SystemProperties sysProps = SystemProperties.getInstance();
private static HashMap<String, String> moduleNameAndTiming = new HashMap<String, String>();
private static Logger logger = Logger.getLogger(Scheduler.class);
private static String namespace = "de.rkom.frauddetection.modules.";
/** Flag zum sauberen beenden des Threads */
volatile boolean run = true;
private String[] module = null;
private SchedulerFactory sf = null;
private org.quartz.Scheduler sched = null;
/** Classic Singelton Pattern */
public static Scheduler getInstance() {
if (instance == null) {
instance = new Scheduler();
}
return instance;
}
public void run() {
try {
getModules();
schedule();
while (run) {
// Control Schleife
// System.out.println("Scheduler running: " +
// sched.isStarted());
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void schedule() throws SchedulerException {
sf = new StdSchedulerFactory();
sched = sf.getScheduler();
logger.info("------- Initializing ----------------------");
/** First we must get a reference to a scheduler */
logger.info("------- Initialization Complete -----------");
/** nächste volle Minute berechnen */
Date runTime = evenMinuteDate(new Date());
logger.info("------- Scheduling Job -------------------");
/** Liste der Module verarbeiten */
Iterator<?> iter = moduleNameAndTiming.entrySet().iterator();
while (iter.hasNext()) {
/**
* module[0] = <name> module[1] = <timing>
*/
module = iter.next().toString().split("=");
/** timing 0=minütlich 1=stündlich */
int timing = 0;
/** Untersuche den Intervall. Minutliche oder Stündliche ausführung? */
if (module[1].substring(module[1].length() - 1).contains("m"))
timing = 1;
else if (module[1].substring(module[1].length() - 1).contains("h"))
timing = 2;
else
logger.warn("Ungueltiges Intervall \'" + module[1]
+ "\' Intervall fuer das Modul \'" + module[0]
+ "\' nicht setzen. > Modul nicht aktiviert.");
/** Standard: Deaktiviert */
int intervall = 0;
try {
intervall = Integer.valueOf(module[1].substring(0, module[1]
.length() - 1));
} catch (Exception e) {
logger.warn("Konnte das Intervall fuer das Modul \'"
+ module[0]
+ "\' nicht setzen. > Modul nicht aktiviert.");
}
/** Intevall 0 bedeutet, modul ist deaktiviert */
if (intervall > 0) {
/** Modulklassen instanzieren und den Trigger definieren */
Class<? extends Job> moduleClass;
try {
moduleClass = (Class<? extends Job>) Class
.forName(namespace + module[0]);
JobDetail job = newJob(moduleClass).withIdentity(module[0],
"group1").build();
Trigger trigger = null;
if (timing == 1) {
System.out.println("\nInitialisiere das Modul \'"
+ module[0]
+ "\' im Intervall \'"
+ module[1]+ "\', zur naechsten vollen Minute ");
/** Trigger minütlich definieren */
trigger = newTrigger()
.startAt(runTime)
.forJob(job)
.withSchedule(
SimpleScheduleBuilder
.repeatMinutelyForever(intervall))
.build();
} else if (timing == 2) {
System.out.println("\nInitialisiere das Modul \'"
+ module[0]
+ "\' im Intervall \'"
+ module[1] + "\', zur naechsten vollen Minute ");
/** Trigger stündlich definieren */
trigger = newTrigger()
.startAt(runTime)
.forJob(job)
.withSchedule(
SimpleScheduleBuilder
.repeatHourlyForever(intervall))
.build();
}
/** Schedule erstellen mit Trigger */
sched.scheduleJob(job, trigger);
} catch (ClassNotFoundException e) {
logger.error("Konnte die Klasse \'" + namespace + module[0]
+ "\' nicht finden.\nModul wird uebersprungen");
} catch (SchedulerException e) {
logger.error("Fehler beim schedulen des Jobs \'"
+ namespace + module[0] + "\'\n");
e.printStackTrace();
}
/** Output ausgeschaltete Module */
} else if (intervall == 0) {
System.out.println("Modul \'" + module[0]
+ "\' in Konfig deaktiviert.");
}
/** Scheduler starten */
try {
sched.start();
logger.info("------- Started Scheduler -----------------");
} catch (SchedulerException e) {
logger.error("------- !Scheduler ERRORED!-----------------");
}
}
}
}
Klasse: HibernateUtil.java ( Steuert die DB Verbindung )
Java:
public class HibernateUtil {
/**
* @param args
*/
private static final SessionFactory sessionFactory = buildSessionFactory();
private static SystemProperties sysProps = null;
public static SessionFactory buildSessionFactory() {
try {
return configureHibernateConnection().buildSessionFactory();
} catch (HibernateException ex) {
throw new ExceptionInInitializerError(ex);
}
}
public static SessionFactory getSessionFactory() {
sessionFactory.openSession();
return sessionFactory;
}
public static Session getSession(){
Session s = sessionFactory.openSession();
return s;
}
public static Configuration configureHibernateConnection() {
sysProps = SystemProperties.getInstance();
Configuration cfg = new Configuration();
cfg.addClass(de.rkom.frauddetection.persistence.TicketObject.class);
cfg.addClass(de.rkom.frauddetection.persistence.ContractData.class);
int i = 0;
cfg.setProperty("hibernate.transaction.factory_class",
"org.hibernate.transaction.JDBCTransactionFactory");
cfg.setProperty("connection.provider_class", "org.hibernate.connection.C3P0ConnectionProvider" );
cfg.setProperty("c3p0.minPoolSize", "40" );
try {
cfg.setProperty("hbm2ddl.auto", sysProps.getHib_hbm2ddl_auto());
} catch (NullPointerException e) {
System.out
.println("Konnte den Wert \'hbm2ddl.auto\' der Hibernate Konfiguration nicht setzen. Pruefen Sie die \'fraudConfig\'");
cfg.setProperty("hbm2ddl.auto", "update");
i++;
}
try {
cfg.setProperty("show_sql", sysProps.getHib_showSql());
} catch (NullPointerException e) {
cfg.setProperty("show_sql", "false");
i++;
}
try {
cfg.setProperty("current_session_context_class", sysProps
.getHib_sessionContextClass());
} catch (NullPointerException e) {
cfg.setProperty("current_session_context_class", "thread");
i++;
}
try {
cfg.setProperty("cache.provider_class", sysProps
.getHib_providerClass());
} catch (NullPointerException e) {
cfg.setProperty("cache.provider_class",
"org.hibernate.cache.NoCacheProvider");
i++;
}
try {
cfg.setProperty("connection.pool_size", sysProps.getHib_poolsize());
} catch (NullPointerException e) {
cfg.setProperty("connection.pool_size", "10");
i++;
}
try {
cfg.setProperty("hibernate.connection.password", sysProps
.getHib_pass());
} catch (NullPointerException e) {
cfg.setProperty("hibernate.connection.password", "postgres");
i++;
}
try {
cfg.setProperty("hibernate.connection.username", sysProps
.getHib_user());
} catch (NullPointerException e) {
cfg.setProperty("hibernate.connection.username", "fraudy");
i++;
}
try {
cfg.setProperty("hibernate.connection.url", sysProps.getHib_url());
} catch (NullPointerException e) {
cfg.setProperty("hibernate.connection.url",
"jdbc:postgresql://localhost/postgres");
i++;
}
try {
cfg.setProperty("hibernate.connection.driver_class", sysProps
.getHib_driver());
} catch (NullPointerException e) {
cfg.setProperty("hibernate.connection.driver_class",
"org.postgresql.Driver");
i++;
}
try {
cfg.setProperty("hibernate.dialect", sysProps.getHib_dialect());
} catch (NullPointerException e) {
cfg.setProperty("hibernate.dialect",
"org.hibernate.dialect.PostgreSQLDialect");
i++;
}
// SchemaExport schemeExp = new SchemaExport(cfg);
// schemeExp.execute(true, true, false, true);
// schemeExp.create(true, true);
cfg.buildMappings();
return cfg;
}
}
Klasse: FraudRepository.java (Stellt Funktionen für DB Zugriff zur Verfügung)
Java:
/**
* Hibernate Fraud Repository
*
* Stellt Funktionen zu verfügung um den Datenbankzugriff zu handeln.
*
* @author ralph
*
*/
public class FraudRepository {
private static Logger logger = Logger.getLogger(FraudRepository.class);
public FraudRepository() {
}
public void deleteObject(Class<?> clazz, int key) {
Session session = HibernateUtil.getSession();
session.beginTransaction();
Object o = null;
try {
o = session.load(clazz, key);
session.delete(o);
} catch (RuntimeException e) {
logger.warn("Konnte das Objekt \'" + key + "\' der Klasse \'"
+ clazz + "\' nicht loeschen. Evtenuell nicht vorhanden? \n" );
}
session.close();
}
/**
* Speichert Objekte ab Aufruf vom FrameWork
*
* @param o
*/
public void createAndStore(Object o) {
Session session = null;
try {
session = HibernateUtil.getSession();
session.beginTransaction();
session.saveOrUpdate(o);
session.getTransaction().commit();
} catch (RuntimeException e) {
logger.error("Konnte das Objekt <"+ o.getClass().toString() +"> nicht abspeichern \n"
+ e.toString());
} finally {
// session.flush();
session.close();
}
}
/**
* Liefert alle Elemente der persistenten Klasse zurück
*
* @param target
* (from TicketObject)
* @return
*/
public List<?> listObjects(String target) {
Session session = HibernateUtil.getSession();
session.beginTransaction();
List<?> result = session.createQuery(target).list();
session.getTransaction().commit();
// session.flush();
session.close();
return result;
}
/**
* Liefert das Objekt aus der Datenbank zurück
*
* @param clazz
* TicketObject oder CustomerData
* @param ID
* > Vertragsnummer
* @return
*/
public Object getObjectById(Class<?> clazz, int ID) {
Session session = HibernateUtil.getSession();
session.beginTransaction();
Object o = null;
try {
o = session.get(clazz, ID);
} catch (RuntimeException e) {
logger.warn("Konnte das Objekt \'" + ID + "\' der Klasse \'"
+ clazz + "\' nicht laden \n" + e.toString());
}
// session.flush();
session.close();
return o;
}
@SuppressWarnings("unchecked")
public List<TicketObject> getAllTickets() {
Session session = HibernateUtil.getSession();
session.beginTransaction();
List<TicketObject> result = (List<TicketObject>) session
.createCriteria(TicketObject.class).list();
HibernateUtil.getSessionFactory().close();
return result;
}
/**
* Löscht Tickets aus der Datenbank, die älter als 6 Monate sind.
*/
public void clearOldTickets() {
Session session = HibernateUtil.getSession();
session.beginTransaction();
Query query = session
.createSQLQuery("DELETE from TICKETS WHERE TIMESTAMP < (now() - ('6 month'::interval))");
// session.flush();
session.close();
}
/**
* Liefert Tickets des letzen Monats
*
* @return
*/
public List getTicketsLastMonth() {
Session session = HibernateUtil.getSession();
session.beginTransaction();
Query query = session
.createSQLQuery("SELECT * from TICKETS WHERE TIMESTAMP > (now() - ('1 month'::interval))");
// session.flush();
List TicketList = query.list();
session.close();
return TicketList;
}
/**
* Liefert Tickets der letzten Woche
*
* @return
*/
public List getTicketsLastWeek() throws Exception {
Session session = HibernateUtil.getSession();
session.beginTransaction();
Query query = session
.createSQLQuery("SELECT * from TICKETS WHERE TIMESTAMP > (now() - (\'1 week\'::interval))");
List TicketList = query.list();
session.close();
return TicketList;
}
@SuppressWarnings("unchecked")
public List<TicketObject> getTicketsByContract(int contract) {
List<TicketObject> result = null;
Session session = HibernateUtil.getSession();
session.beginTransaction();
Criteria criteria = session.createCriteria(TicketObject.class);
criteria.add(Restrictions.eq("rfonvertragid", contract));
result = (List<TicketObject>) criteria.list();
session.close();
return result;
}
/**
* Selects all Tickets in the given period.
*
* @param days
* >> amount of days
* @return
*/
@SuppressWarnings("unchecked")
public List<TicketObject> getTicketsLastDays(int days) {
List<TicketObject> result = null;
Session session = HibernateUtil.getSession();
Date now = new Date();
Calendar cal = Calendar.getInstance();
cal.add(Calendar.DATE, -days);
Date low = new Date(cal.getTimeInMillis());
session.beginTransaction();
Criteria criteria = session.createCriteria(TicketObject.class);
criteria.add(org.hibernate.criterion.Expression.ge("timestamp", low));
criteria.add(org.hibernate.criterion.Expression.le("timestamp", now));
result = criteria.list();
session.close();
return result;
}
/**
* Liefert alle Tickets zurück
*
* @return
*/
public List<TicketObject> getTickets() {
Session session = HibernateUtil.getSession();
session.beginTransaction();
Query query = session.createSQLQuery("SELECT * from TICKETS");
List<TicketObject> TicketList = query.list();
session.close();
return TicketList;
}
/**
* Liefert das erste Ticket zurück. Aufruf für den Status aus der MAIN
*
* @return
*/
public String getFirstTicket() {
// Session session =
// HibernateUtil.getSessionFactory().getCurrentSession();
String result = null;
Session session = HibernateUtil.getSession();
session.beginTransaction();
Query query = session
.createSQLQuery("SELECT timestamp from TICKETS LIMIT 1");
// session.flush();
result = query.list().toString();
session.close();
return result;
}
/**
* Liefert die Anzahl der Tickets zurück
*
* @return
*/
public String getAmountTickets() {
// Session session =
// HibernateUtil.getSessionFactory().getCurrentSession();
String result = null;
Session session = HibernateUtil.getSession();
session.beginTransaction();
Query query = session.createSQLQuery("SELECT COUNT(*) from TICKETS");
result = query.list().toString();
session.close();
return result;
}
}