/** * Copyright (c) 2006 - 2008 Smaxe Ltd (www.smaxe.com). * All rights reserved. */ package com.smaxe.app.uv.downloader; import com.smaxe.logger.ILogger; import com.smaxe.logger.support.Loggers; import com.smaxe.uv.UrlInfo; import com.smaxe.uv.client.INetConnection; import com.smaxe.uv.client.INetStream; import com.smaxe.uv.client.NetConnection; import com.smaxe.uv.client.NetStream; import com.smaxe.uv.client.video.FlvVideo; import com.smaxe.uv.communication.Protocol; import com.smaxe.uv.rtmp.core.Status; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * <code>RtmpDownloader</code> - downloads audio/video stream from the RTMP/RTMPT-enabled * server to the local FLV file. * * @author Andrei Sochirca */ public final class RtmpDownloader extends Object { /** * <code>DownloadTask</code> - download task. */ private final class DownloadTask extends Object implements Future<Boolean>, Runnable { /** * <code>NetConnectionListener</code> - {@link NetConnection} listener. */ private class NetConnectionListener extends NetConnection.ListenerAdapter { // fields private FlvVideo flvVideo = null; private Object stream = null; /** * Constructor. * * @param stream stream to play * @param flvVideo file */ public NetConnectionListener(final Object stream, final FlvVideo flvVideo) { this.flvVideo = flvVideo; this.stream = stream; } @Override public void onAsyncError(final INetConnection source, final String message, final Exception e) { logger.log(ILogger.DEBUG, "NetConnection#onAsyncError: " + message, e); disconnected = true; } @Override public void onIOError(final INetConnection source, final String message) { logger.log(ILogger.DEBUG, "NetConnection#onIOError: " + message, null); disconnected = true; } @Override public void onNetStatus(final INetConnection source, final Map<String, Object> info) { logger.log(ILogger.DEBUG, "NetConnection#onNetStatus: " + info, null); final Object code = info.get(Status.CODE); if (NetConnection.CONNECT_SUCCESS.equals(code)) { dispatcher.execute(new Runnable() { public void run() { final NetStream netStream = new NetStream(source); netStream.addEventListener(new NetStream.ListenerAdapter() { @Override public void onNetStatus(final INetStream source, final Map<String, Object> info) { logger.log(ILogger.DEBUG, "NetStream#onNetStatus: " + info, null); final String code = (String) info.get(Status.CODE); if (NetStream.PLAY_START.equals(code)) { } else if (NetStream.PLAY_STOP.equals(code) || NetStream.UNPUBLISH_SUCCESS.equals(code)) { netStream.close(); flvVideo.release(); result = Boolean.TRUE; disconnected = true; } } }); netStream.play(flvVideo, stream); } }); } else if (NetConnection.CONNECT_CLOSED.equals(code)) { flvVideo.release(); disconnected = true; } else { result = new Exception((String) info.get(Status.DESCRIPTION)); disconnected = true; } } } // fields private final String url; private final Object[] args; private final Map<String, Object> configuration; private final Object stream; private FlvVideo flvVideo = null; private ExecutorService dispatcher = null; // flags private Object result = null; private boolean disconnected = false; private boolean cancelled = false; /** * Constructor. * * @param url server url * @param args connection arguments * @param configuration connection configuration * @param stream stream to download * @param file * @throws Exception if an exception occured */ public DownloadTask(final String url, final Object[] args, final Map<String, Object> configuration, final Object stream, final String file) throws Exception { this.url = url; this.args = args; this.configuration = configuration; this.stream = stream; this.flvVideo = new FlvVideo(file, 512 * 1024, true /*sync*/); } public boolean isCancelled() { return cancelled; } public boolean isDone() { return cancelled || result != null || disconnected; } public boolean cancel(final boolean mayInterruptIfRunning) { if (cancelled || disconnected) return false; cancelled = true; disconnected = true; return true; } public Boolean get() throws InterruptedException, ExecutionException { while (!isDone()) { Thread.sleep(100); } if (cancelled) throw new CancellationException(); if (result instanceof Boolean) { return (Boolean) result; } else { throw new ExecutionException((Exception) result); } } public Boolean get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { final long ctime = System.currentTimeMillis(); final long delay = unit.convert(timeout, TimeUnit.MILLISECONDS); while (!isDone()) { Thread.sleep(100); if (System.currentTimeMillis() - ctime >= delay) break; } if (cancelled) throw new CancellationException(); if (result == null) throw new TimeoutException(); if (result instanceof Boolean) { return (Boolean) result; } else { throw new ExecutionException((Exception) result); } } public void run() { dispatcher = Executors.newSingleThreadExecutor(); final NetConnection connection = new NetConnection(configuration); connection.addEventListener(new NetConnectionListener(stream, flvVideo)); UrlInfo info = UrlInfo.parseUrl(url); if (info.protocol != null && info.protocol.indexOf('e') >= 0) { info = new UrlInfo(Protocol.RTMP, info.host, info.port, info.app, info.instance, info.scope, info.parameters); } connection.connect(info.toString(), args); while (!isDone()) { try { Thread.sleep(200); } catch (Exception e) {/*ignore*/} } connection.close(); dispatcher.shutdownNow(); } } // fields private ILogger logger = null; /** * Constructor. */ public RtmpDownloader() { this.setLogger(null); this.setDebugMode(false); } /** * Set <code>true</code> for debug mode. * * @param debug set <code>true</code> to enable debug mode; <code>false</code> to disable */ public void setDebugMode(final boolean debug) { this.logger = debug ? Loggers.createSoLogger(ILogger.DEBUG, "Downloader") : Loggers.createNullLogger(); } /** * Sets the logger. * * @param logger logger */ private void setLogger(final ILogger logger) { this.logger = logger == null ? Loggers.createNullLogger() : logger; } /** * Downloads <code>stream</code> from the <code>url</code> and * saves it to the <code>file</code>. * <p> Note: Connection configuration fields are: * <br> "fpad" - (default: false) * <br> "pageUrl" - page url (default: "") * <br> "swfUrl" - SWF url (default: "") * <br> "flashVer" - flash version (default: "WIN 9,0,124,0") * <br> "audioCodecs" - audio codecs (default: 615) * <br> "videoCodecs" - video codecs (default: 124) * <br> "videoFunction" - video function (default: 1) * <br> * <br> <code>stream</code> parameter can be either String (name of the stream) or * an array, please check developer guide for details. * * @param url connection url * @param args connection arguments * @param configuration connection configuration * @param stream stream to record * @param file local file to store stream * @return <code>true</code> if succeeded to download; otherwise <code>false</code> * @throws Exception if an exception occurred */ public Future<Boolean> download(final String url, final Object[] args, final Map<String, Object> configuration, final Object stream, final String file) throws Exception { final DownloadTask task = new DownloadTask(url, args, prepareConfiguration(configuration), stream, file); new Thread(task).start(); return task; } /** * Prepares <code>configuration</code>. * * @param configuration * @return updated configuration */ private Map<String, Object> prepareConfiguration(final Map<String, Object> configuration) { Map<String, Object> conf = configuration == null ? new HashMap<String, Object>() : configuration; // beans conf.put(INetConnection.Configuration.LOGGER, logger); // setDefaultIfNotSet(conf, "IOTimeout", 75); setDefaultIfNotSet(conf, INetConnection.Configuration.RECEIVE_BUFFER_SIZE, 128 * 1024); setDefaultIfNotSet(conf, INetConnection.Configuration.SEND_BUFFER_SIZE, 16 * 1024); setDefaultIfNotSet(conf, INetConnection.Configuration.STREAM_BUFFER_SIZE, 4 * 128 * 1024); return conf; } /** * Sets default value if the value is not set as parameter. * * @param configuration * @param property * @param defaultValue */ private void setDefaultIfNotSet(final Map<String, Object> configuration, final String property, final Object defaultValue) { if (configuration == null || configuration.get(property) != null) return; configuration.put(property, defaultValue); } }